summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2015-10-30 13:15:39 -0400
committerDavid Storch <david.storch@10gen.com>2015-11-05 11:26:56 -0500
commit5b34689a84b969affc822e014e44816959ca460b (patch)
tree29f16d4988fbad4f5c62892fd253cbe90d02b3fa
parent5edafdbf6ca1effcb18d62c8e53b37544afecfcc (diff)
downloadmongo-5b34689a84b969affc822e014e44816959ca460b.tar.gz
SERVER-21218 make mongos forward maxTimeMS on getMore command to mongod
-rw-r--r--jstests/core/getmore_cmd_maxtimems.js45
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp6
-rw-r--r--src/mongo/db/query/SConscript1
-rw-r--r--src/mongo/db/query/getmore_request.cpp30
-rw-r--r--src/mongo/db/query/getmore_request.h5
-rw-r--r--src/mongo/db/query/getmore_request_test.cpp54
-rw-r--r--src/mongo/db/query/lite_parsed_query.cpp12
-rw-r--r--src/mongo/db/query/lite_parsed_query.h13
-rw-r--r--src/mongo/db/query/lite_parsed_query_test.cpp34
-rw-r--r--src/mongo/s/commands/cluster_move_chunk_cmd.cpp3
-rw-r--r--src/mongo/s/query/async_results_merger.cpp22
-rw-r--r--src/mongo/s/query/async_results_merger.h12
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp60
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h10
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp13
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp6
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h3
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h9
-rw-r--r--src/mongo/s/query/cluster_find.cpp8
-rw-r--r--src/mongo/s/query/router_exec_stage.h10
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp4
-rw-r--r--src/mongo/s/query/router_stage_limit.h2
-rw-r--r--src/mongo/s/query/router_stage_limit_test.cpp13
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp4
-rw-r--r--src/mongo/s/query/router_stage_merge.h2
-rw-r--r--src/mongo/s/query/router_stage_mock.cpp13
-rw-r--r--src/mongo/s/query/router_stage_mock.h8
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp4
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h2
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey_test.cpp13
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp4
-rw-r--r--src/mongo/s/query/router_stage_skip.h2
-rw-r--r--src/mongo/s/query/router_stage_skip_test.cpp13
-rw-r--r--src/mongo/s/strategy.cpp6
-rw-r--r--src/mongo/shell/bench.cpp1
39 files changed, 417 insertions, 43 deletions
diff --git a/jstests/core/getmore_cmd_maxtimems.js b/jstests/core/getmore_cmd_maxtimems.js
new file mode 100644
index 00000000000..a0c82667756
--- /dev/null
+++ b/jstests/core/getmore_cmd_maxtimems.js
@@ -0,0 +1,45 @@
+// Test attaching maxTimeMS to a getMore command.
+(function() {
+ 'use strict';
+
+ var cmdRes;
+ var collName = 'getmore_cmd_maxtimems';
+ var coll = db[collName];
+ coll.drop();
+
+ for (var i = 0; i < 10; i++) {
+ assert.writeOK(coll.insert({a: i}));
+ }
+
+ // Can't attach maxTimeMS to a getMore command for a non-tailable cursor over a non-capped
+ // collection.
+ cmdRes = db.runCommand({find: collName, batchSize: 2});
+ assert.commandWorked(cmdRes);
+ cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 60000});
+ assert.commandFailed(cmdRes);
+
+ coll.drop();
+ assert.commandWorked(db.createCollection(collName, {capped: true, size: 1024}));
+ for (var i = 0; i < 10; i++) {
+ assert.writeOK(coll.insert({a: i}));
+ }
+
+ // Can't attach maxTimeMS to a getMore command for a non-tailable cursor over a capped
+ // collection.
+ cmdRes = db.runCommand({find: collName, batchSize: 2});
+ assert.commandWorked(cmdRes);
+ cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 60000});
+ assert.commandFailed(cmdRes);
+
+ // Can't attach maxTimeMS to a getMore command for a non-awaitData tailable cursor.
+ cmdRes = db.runCommand({find: collName, batchSize: 2, tailable: true});
+ assert.commandWorked(cmdRes);
+ cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 60000});
+ assert.commandFailed(cmdRes);
+
+ // Can attach maxTimeMS to a getMore command for an awaitData cursor.
+ cmdRes = db.runCommand({find: collName, batchSize: 2, tailable: true, awaitData: true});
+ assert.commandWorked(cmdRes);
+ cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 60000});
+ assert.commandWorked(cmdRes);
+})();
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index c2b00263a41..35c7a6b6bea 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -228,6 +228,12 @@ public:
}
}
+ if (request.awaitDataTimeout && !isCursorAwaitData(cursor)) {
+ Status status(ErrorCodes::BadValue,
+ "cannot set maxTimeMS on getMore command for a non-awaitData cursor");
+ return appendCommandStatus(result, status);
+ }
+
// Validate term, if provided.
if (request.term) {
auto replCoord = repl::ReplicationCoordinator::get(txn);
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 18e821b49b1..d25d0a0a909 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -125,6 +125,7 @@ env.Library(
'$BUILD_DIR/mongo/db/namespace_string',
'$BUILD_DIR/mongo/db/repl/optime',
'$BUILD_DIR/mongo/rpc/command_status',
+ 'lite_parsed_query',
]
)
diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp
index 120def57a51..2606236813a 100644
--- a/src/mongo/db/query/getmore_request.cpp
+++ b/src/mongo/db/query/getmore_request.cpp
@@ -45,7 +45,7 @@ namespace {
const char kCollectionField[] = "collection";
const char kBatchSizeField[] = "batchSize";
-const char kMaxTimeMSField[] = "maxTimeMS";
+const char kAwaitDataTimeoutField[] = "maxTimeMS";
const char kTermField[] = "term";
const char kLastKnownCommittedOpTimeField[] = "lastKnownCommittedOpTime";
@@ -58,11 +58,13 @@ GetMoreRequest::GetMoreRequest() : cursorid(0), batchSize(0) {}
GetMoreRequest::GetMoreRequest(NamespaceString namespaceString,
CursorId id,
boost::optional<long long> sizeOfBatch,
+ boost::optional<Milliseconds> awaitDataTimeout,
boost::optional<long long> term,
boost::optional<repl::OpTime> lastKnownCommittedOpTime)
: nss(std::move(namespaceString)),
cursorid(id),
batchSize(sizeOfBatch),
+ awaitDataTimeout(awaitDataTimeout),
term(term),
lastKnownCommittedOpTime(lastKnownCommittedOpTime) {}
@@ -104,6 +106,7 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna
// Optional fields.
boost::optional<long long> batchSize;
+ boost::optional<Milliseconds> awaitDataTimeout;
boost::optional<long long> term;
boost::optional<repl::OpTime> lastKnownCommittedOpTime;
@@ -131,10 +134,15 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna
}
batchSize = el.numberLong();
- } else if (str::equals(fieldName, kMaxTimeMSField)) {
- // maxTimeMS is parsed by the command handling code, so we don't repeat the parsing
- // here.
- continue;
+ } else if (str::equals(fieldName, kAwaitDataTimeoutField)) {
+ auto maxAwaitDataTime = LiteParsedQuery::parseMaxTimeMS(el);
+ if (!maxAwaitDataTime.isOK()) {
+ return maxAwaitDataTime.getStatus();
+ }
+
+ if (maxAwaitDataTime.getValue()) {
+ awaitDataTimeout = Milliseconds(maxAwaitDataTime.getValue());
+ }
} else if (str::equals(fieldName, kTermField)) {
if (el.type() != BSONType::NumberLong) {
return {ErrorCodes::TypeMismatch,
@@ -165,8 +173,12 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna
str::stream() << "Field 'collection' missing in: " << cmdObj};
}
- GetMoreRequest request(
- NamespaceString(*fullns), *cursorid, batchSize, term, lastKnownCommittedOpTime);
+ GetMoreRequest request(NamespaceString(*fullns),
+ *cursorid,
+ batchSize,
+ awaitDataTimeout,
+ term,
+ lastKnownCommittedOpTime);
Status validStatus = request.isValid();
if (!validStatus.isOK()) {
return validStatus;
@@ -185,6 +197,10 @@ BSONObj GetMoreRequest::toBSON() const {
builder.append(kBatchSizeField, *batchSize);
}
+ if (awaitDataTimeout) {
+ builder.append(kAwaitDataTimeoutField, durationCount<Milliseconds>(*awaitDataTimeout));
+ }
+
if (term) {
builder.append(kTermField, *term);
}
diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h
index 56a1f2031b0..16455bfb055 100644
--- a/src/mongo/db/query/getmore_request.h
+++ b/src/mongo/db/query/getmore_request.h
@@ -35,6 +35,7 @@
#include "mongo/base/status_with.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -52,6 +53,7 @@ struct GetMoreRequest {
GetMoreRequest(NamespaceString namespaceString,
CursorId id,
boost::optional<long long> sizeOfBatch,
+ boost::optional<Milliseconds> awaitDataTimeout,
boost::optional<long long> term,
boost::optional<repl::OpTime> lastKnownCommittedOpTime);
@@ -76,6 +78,9 @@ struct GetMoreRequest {
// as fit within the byte limit.
const boost::optional<long long> batchSize;
+ // The number of milliseconds for which a getMore on a tailable, awaitData query should block.
+ const boost::optional<Milliseconds> awaitDataTimeout;
+
// Only internal queries from replication will typically have a term.
const boost::optional<long long> term;
diff --git a/src/mongo/db/query/getmore_request_test.cpp b/src/mongo/db/query/getmore_request_test.cpp
index 0bd7e788033..997310c9645 100644
--- a/src/mongo/db/query/getmore_request_test.cpp
+++ b/src/mongo/db/query/getmore_request_test.cpp
@@ -175,7 +175,7 @@ TEST(GetMoreRequestTest, parseFromBSONIgnoreDollarPrefixedFields) {
ASSERT_EQUALS(CursorId(123), result.getValue().cursorid);
}
-TEST(GetMoreRequestTest, parseFromBSONIgnoreMaxTimeMS) {
+TEST(GetMoreRequestTest, parseFromBSONHasMaxTimeMS) {
StatusWith<GetMoreRequest> result =
GetMoreRequest::parseFromBSON("db",
BSON("getMore" << CursorId(123) << "collection"
@@ -183,11 +183,28 @@ TEST(GetMoreRequestTest, parseFromBSONIgnoreMaxTimeMS) {
<< "maxTimeMS" << 100));
ASSERT_OK(result.getStatus());
ASSERT_EQUALS("db.coll", result.getValue().nss.toString());
+ ASSERT(result.getValue().awaitDataTimeout);
+ ASSERT_EQUALS(100, durationCount<Milliseconds>(*result.getValue().awaitDataTimeout));
ASSERT_EQUALS(CursorId(123), result.getValue().cursorid);
}
+TEST(GetMoreRequestTest, parseFromBSONHasMaxTimeMSOfZero) {
+ StatusWith<GetMoreRequest> result =
+ GetMoreRequest::parseFromBSON("db",
+ BSON("getMore" << CursorId(123) << "collection"
+ << "coll"
+ << "maxTimeMS" << 0));
+ ASSERT_OK(result.getStatus());
+ ASSERT_EQUALS("db.coll", result.getValue().nss.toString());
+ ASSERT_EQUALS(CursorId(123), result.getValue().cursorid);
+
+ // Max time of 0 means the same thing as no max time.
+ ASSERT(!result.getValue().awaitDataTimeout);
+}
+
TEST(GetMoreRequestTest, toBSONHasBatchSize) {
- GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, boost::none, boost::none);
+ GetMoreRequest request(
+ NamespaceString("testdb.testcoll"), 123, 99, boost::none, boost::none, boost::none);
BSONObj requestObj = request.toBSON();
BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection"
<< "testcoll"
@@ -196,8 +213,12 @@ TEST(GetMoreRequestTest, toBSONHasBatchSize) {
}
TEST(GetMoreRequestTest, toBSONMissingMatchSize) {
- GetMoreRequest request(
- NamespaceString("testdb.testcoll"), 123, boost::none, boost::none, boost::none);
+ GetMoreRequest request(NamespaceString("testdb.testcoll"),
+ 123,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
BSONObj requestObj = request.toBSON();
BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection"
<< "testcoll");
@@ -205,7 +226,8 @@ TEST(GetMoreRequestTest, toBSONMissingMatchSize) {
}
TEST(GetMoreRequestTest, toBSONHasTerm) {
- GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, 1, boost::none);
+ GetMoreRequest request(
+ NamespaceString("testdb.testcoll"), 123, 99, boost::none, 1, boost::none);
BSONObj requestObj = request.toBSON();
BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection"
<< "testcoll"
@@ -214,8 +236,12 @@ TEST(GetMoreRequestTest, toBSONHasTerm) {
}
TEST(GetMoreRequestTest, toBSONHasCommitLevel) {
- GetMoreRequest request(
- NamespaceString("testdb.testcoll"), 123, 99, 1, repl::OpTime(Timestamp(0, 10), 2));
+ GetMoreRequest request(NamespaceString("testdb.testcoll"),
+ 123,
+ 99,
+ boost::none,
+ 1,
+ repl::OpTime(Timestamp(0, 10), 2));
BSONObj requestObj = request.toBSON();
BSONObj expectedRequest =
BSON("getMore" << CursorId(123) << "collection"
@@ -225,4 +251,18 @@ TEST(GetMoreRequestTest, toBSONHasCommitLevel) {
ASSERT_EQ(requestObj, expectedRequest);
}
+TEST(GetMoreRequestTest, toBSONHasMaxTimeMS) {
+ GetMoreRequest request(NamespaceString("testdb.testcoll"),
+ 123,
+ boost::none,
+ Milliseconds(789),
+ boost::none,
+ boost::none);
+ BSONObj requestObj = request.toBSON();
+ BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection"
+ << "testcoll"
+ << "maxTimeMS" << 789);
+ ASSERT_EQ(requestObj, expectedRequest);
+}
+
} // namespace
diff --git a/src/mongo/db/query/lite_parsed_query.cpp b/src/mongo/db/query/lite_parsed_query.cpp
index b86b0945363..a4f9696b7a0 100644
--- a/src/mongo/db/query/lite_parsed_query.cpp
+++ b/src/mongo/db/query/lite_parsed_query.cpp
@@ -666,17 +666,7 @@ Status LiteParsedQuery::validate() const {
}
// static
-StatusWith<int> LiteParsedQuery::parseMaxTimeMSCommand(const BSONObj& cmdObj) {
- return parseMaxTimeMS(cmdObj[cmdOptionMaxTimeMS]);
-}
-
-// static
-StatusWith<int> LiteParsedQuery::parseMaxTimeMSQuery(const BSONObj& queryObj) {
- return parseMaxTimeMS(queryObj[queryOptionMaxTimeMS]);
-}
-
-// static
-StatusWith<int> LiteParsedQuery::parseMaxTimeMS(const BSONElement& maxTimeMSElt) {
+StatusWith<int> LiteParsedQuery::parseMaxTimeMS(BSONElement maxTimeMSElt) {
if (!maxTimeMSElt.eoo() && !maxTimeMSElt.isNumber()) {
return StatusWith<int>(
ErrorCodes::BadValue,
diff --git a/src/mongo/db/query/lite_parsed_query.h b/src/mongo/db/query/lite_parsed_query.h
index af139ef1a7d..0287ad159e6 100644
--- a/src/mongo/db/query/lite_parsed_query.h
+++ b/src/mongo/db/query/lite_parsed_query.h
@@ -119,16 +119,9 @@ public:
void asFindCommand(BSONObjBuilder* cmdBuilder) const;
/**
- * Helper functions to parse maxTimeMS from a command object. Returns the contained value,
- * or an error on parsing fail. When passed an EOO-type element, returns 0 (special value
- * for "allow to run indefinitely").
+ * Parses maxTimeMS from the BSONElement containing its value.
*/
- static StatusWith<int> parseMaxTimeMSCommand(const BSONObj& cmdObj);
-
- /**
- * Same as parseMaxTimeMSCommand, but for a query object.
- */
- static StatusWith<int> parseMaxTimeMSQuery(const BSONObj& queryObj);
+ static StatusWith<int> parseMaxTimeMS(BSONElement maxTimeMSElt);
/**
* Helper function to identify text search sort key
@@ -298,8 +291,6 @@ public:
static StatusWith<std::unique_ptr<LiteParsedQuery>> fromLegacyQueryMessage(
const QueryMessage& qm);
- static StatusWith<int> parseMaxTimeMS(const BSONElement& maxTimeMSElt);
-
private:
LiteParsedQuery(NamespaceString nss);
diff --git a/src/mongo/db/query/lite_parsed_query_test.cpp b/src/mongo/db/query/lite_parsed_query_test.cpp
index d6c36a877c8..d2c1cd92842 100644
--- a/src/mongo/db/query/lite_parsed_query_test.cpp
+++ b/src/mongo/db/query/lite_parsed_query_test.cpp
@@ -1216,5 +1216,39 @@ TEST(LiteParsedQueryTest, ParseFromCommandForbidExtraOption) {
ASSERT_NOT_OK(result.getStatus());
}
+TEST(LiteParsedQueryTest, ParseMaxTimeMSStringValueFails) {
+ BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << "foo");
+ ASSERT_NOT_OK(LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS]));
+}
+
+TEST(LiteParsedQueryTest, ParseMaxTimeMSNonIntegralValueFails) {
+ BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << 100.3);
+ ASSERT_NOT_OK(LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS]));
+}
+
+TEST(LiteParsedQueryTest, ParseMaxTimeMSOutOfRangeDoubleFails) {
+ BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << 1e200);
+ ASSERT_NOT_OK(LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS]));
+}
+
+TEST(LiteParsedQueryTest, ParseMaxTimeMSNegativeValueFails) {
+ BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << -400);
+ ASSERT_NOT_OK(LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS]));
+}
+
+TEST(LiteParsedQueryTest, ParseMaxTimeMSZeroSucceeds) {
+ BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << 0);
+ auto maxTime = LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS]);
+ ASSERT_OK(maxTime);
+ ASSERT_EQ(maxTime.getValue(), 0);
+}
+
+TEST(LiteParsedQueryTest, ParseMaxTimeMSPositiveInRangeSucceeds) {
+ BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << 300);
+ auto maxTime = LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS]);
+ ASSERT_OK(maxTime);
+ ASSERT_EQ(maxTime.getValue(), 300);
+}
+
} // namespace mongo
} // namespace
diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
index eaf0394350b..a4b1efc62c6 100644
--- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
+++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp
@@ -216,7 +216,8 @@ public:
LOG(0) << "CMD: movechunk: " << cmdObj;
- StatusWith<int> maxTimeMS = LiteParsedQuery::parseMaxTimeMSCommand(cmdObj);
+ StatusWith<int> maxTimeMS =
+ LiteParsedQuery::parseMaxTimeMS(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]);
if (!maxTimeMS.isOK()) {
errmsg = maxTimeMS.getStatus().reason();
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index cdada8c55f8..eaaf759156e 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -111,6 +111,18 @@ bool AsyncResultsMerger::remotesExhausted_inlock() {
return true;
}
+Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (!_params.isTailable || !_params.isAwaitData) {
+ return Status(ErrorCodes::BadValue,
+ "maxTimeMS can only be used with getMore for tailable, awaitData cursors");
+ }
+
+ _awaitDataTimeout = awaitDataTimeout;
+ return Status::OK();
+}
+
bool AsyncResultsMerger::ready() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return ready_inlock();
@@ -268,10 +280,12 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
adjustedBatchSize = *_params.batchSize - remote.fetchedCount;
}
- cmdObj =
- GetMoreRequest(
- _params.nsString, *remote.cursorId, adjustedBatchSize, boost::none, boost::none)
- .toBSON();
+ cmdObj = GetMoreRequest(_params.nsString,
+ *remote.cursorId,
+ adjustedBatchSize,
+ _awaitDataTimeout,
+ boost::none,
+ boost::none).toBSON();
} else {
// Do the first time shard host resolution.
invariant(_params.readPreference);
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 9fc9b36960b..40e309e2273 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -40,6 +40,7 @@
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -91,6 +92,15 @@ public:
bool remotesExhausted();
/**
+ * Sets the maxTimeMS value that the ARM should forward with any internally issued getMore
+ * requests.
+ *
+ * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if
+ * the cursor is not tailable + awaitData).
+ */
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
+
+ /**
* Returns true if there is no need to schedule remote work in order to take the next action.
* This means that either
* --there is a buffered result which we can return,
@@ -352,6 +362,8 @@ private:
// boost::none.
bool _eofNext = false;
+ boost::optional<Milliseconds> _awaitDataTimeout;
+
//
// Killing
//
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 7dd3d6c2e2f..7886f92a713 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -117,6 +117,7 @@ protected:
params.batchSize = getMoreBatchSize ? getMoreBatchSize : lpq->getBatchSize();
params.skip = lpq->getSkip();
params.isTailable = lpq->isTailable();
+ params.isAwaitData = lpq->isAwaitData();
params.isAllowPartialResults = lpq->isAllowPartialResults();
for (const auto& shardId : shardIds) {
@@ -1307,6 +1308,65 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) {
ASSERT_TRUE(arm->ready());
}
+TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}");
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
+ responses.emplace_back(_nss, CursorId(123), batch1);
+ scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse);
+ executor->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT(!unittest::assertGet(arm->nextReady()));
+
+ ASSERT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
+
+ ASSERT_FALSE(arm->ready());
+ readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // Pending getMore request should include maxTimeMS.
+ BSONObj expectedCmdObj = BSON("getMore" << CursorId(123) << "collection"
+ << "testcoll"
+ << "maxTimeMS" << 789);
+ ASSERT_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj);
+
+ responses.clear();
+ std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
+ responses.emplace_back(_nss, CursorId(0), batch2);
+ scheduleNetworkResponses(std::move(responses),
+ CursorResponse::ResponseType::SubsequentResponse);
+ executor->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady()));
+ ASSERT_TRUE(arm->ready());
+ ASSERT(!unittest::assertGet(arm->nextReady()));
+}
+
+TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) {
+ BSONObj findCmd = fromjson("{find: 'testcoll'}");
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
+ auto killEvent = arm->kill();
+ executor->waitForEvent(killEvent);
+}
+
+TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
+ makeCursorFromFindCmd(findCmd, {kTestShardIds[0]});
+ ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789)));
+ auto killEvent = arm->kill();
+ executor->waitForEvent(killEvent);
+}
} // namespace
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index d0e6d8cf811..55f2412f7c8 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -31,6 +31,7 @@
#include <boost/optional.hpp>
#include "mongo/db/jsobj.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -95,6 +96,15 @@ public:
* Returns whether or not all the remote cursors underlying this cursor have been exhausted.
*/
virtual bool remotesExhausted() = 0;
+
+ /**
+ * Sets the maxTimeMS value that the cursor should forward with any internally issued getMore
+ * requests.
+ *
+ * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if
+ * the cursor is not tailable + awaitData).
+ */
+ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index bf6d301d0d2..a80562042dc 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -119,6 +119,10 @@ bool ClusterClientCursorImpl::remotesExhausted() {
return _root->remotesExhausted();
}
+Status ClusterClientCursorImpl::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return _root->setAwaitDataTimeout(awaitDataTimeout);
+}
+
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
executor::TaskExecutor* executor, ClusterClientCursorParams&& params) {
const auto skip = params.skip;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 76c3f4c6708..1a8766f9005 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -108,6 +108,8 @@ public:
bool remotesExhausted() final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
private:
/**
* Constructs a cluster client cursor.
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index 96f5e2944a2..50b47f47817 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -129,6 +129,19 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) {
ASSERT_EQ(cursor.getNumReturnedSoFar(), 2LL);
}
+TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStagePtr = mockStage.get();
+ ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
+
+ ClusterClientCursorImpl cursor(std::move(mockStage));
+ ASSERT_OK(cursor.setAwaitDataTimeout(Milliseconds(789)));
+
+ auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
+ ASSERT_OK(awaitDataTimeout.getStatus());
+ ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue()));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 97e999c6939..4cf9418fc7f 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -32,6 +32,8 @@
#include "mongo/s/query/cluster_client_cursor_mock.h"
+#include "mongo/util/assert_util.h"
+
namespace mongo {
ClusterClientCursorMock::ClusterClientCursorMock(stdx::function<void(void)> killCallback)
@@ -91,4 +93,8 @@ void ClusterClientCursorMock::queueError(Status status) {
_resultsQueue.push({status});
}
+Status ClusterClientCursorMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ MONGO_UNREACHABLE;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index a8515ba61ce..67efae2181a 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -53,6 +53,8 @@ public:
void queueResult(const BSONObj& obj) final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
/**
* Returns true unless marked as having non-exhausted remote cursors via
* markRemotesNotExhausted().
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 0e497fd105a..5de1b912b72 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -132,6 +132,9 @@ struct ClusterClientCursorParams {
// Whether this cursor is tailing a capped collection.
bool isTailable = false;
+ // Whether this cursor has the awaitData option set.
+ bool isAwaitData = false;
+
// Read preference for where to target the query. This value is only set if initial shard host
// targeting is necessary and not used if using externally prepared cursor ids.
boost::optional<ReadPreferenceSetting> readPreference;
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 641bb29c30d..96ab2eb88dc 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -148,6 +148,11 @@ bool ClusterCursorManager::PinnedCursor::remotesExhausted() {
return _cursor->remotesExhausted();
}
+Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ invariant(_cursor);
+ return _cursor->setAwaitDataTimeout(awaitDataTimeout);
+}
+
void ClusterCursorManager::PinnedCursor::returnAndKillCursor() {
invariant(_cursor);
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index b26796628c2..7c04414f15c 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -189,6 +189,15 @@ public:
*/
bool remotesExhausted();
+ /**
+ * Sets the maxTimeMS value that the cursor should forward with any internally issued
+ * getMore requests. A cursor must be owned.
+ *
+ * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e.
+ * if the cursor is not tailable + awaitData).
+ */
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
+
private:
// ClusterCursorManager is a friend so that its methods can call the PinnedCursor
// constructor declared below, which is private to prevent clients from calling it directly.
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 01562d8e835..34374af7a64 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -220,6 +220,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
params.batchSize = query.getParsed().getEffectiveBatchSize();
params.skip = query.getParsed().getSkip();
params.isTailable = query.getParsed().isTailable();
+ params.isAwaitData = query.getParsed().isAwaitData();
params.isAllowPartialResults = query.getParsed().isAllowPartialResults();
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
@@ -406,6 +407,13 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn,
}
invariant(request.cursorid == pinnedCursor.getValue().getCursorId());
+ if (request.awaitDataTimeout) {
+ auto status = pinnedCursor.getValue().setAwaitDataTimeout(*request.awaitDataTimeout);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
std::vector<BSONObj> batch;
int bytesBuffered = 0;
long long batchSize = request.batchSize.value_or(0);
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 68e12b7f1f6..deb4bf34f9f 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -33,6 +33,7 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/util/time_support.h"
namespace mongo {
@@ -71,6 +72,15 @@ public:
*/
virtual bool remotesExhausted() = 0;
+ /**
+ * Sets the maxTimeMS value that the cursor should forward with any internally issued getMore
+ * requests.
+ *
+ * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if
+ * the cursor is not tailable + awaitData).
+ */
+ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
+
protected:
/**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index c2f584f0358..9a9f77fbf00 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -63,4 +63,8 @@ bool RouterStageLimit::remotesExhausted() {
return getChildStage()->remotesExhausted();
}
+Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index 26ced69b24b..0db06c30c3b 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -45,6 +45,8 @@ public:
bool remotesExhausted() final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
private:
long long _limit;
diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp
index b85d82bef2d..fd8fa335e7e 100644
--- a/src/mongo/s/query/router_stage_limit_test.cpp
+++ b/src/mongo/s/query/router_stage_limit_test.cpp
@@ -164,6 +164,19 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) {
ASSERT_TRUE(limitStage->remotesExhausted());
}
+TEST(RouterStageLimitTest, ForwardsAwaitDataTimeout) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStagePtr = mockStage.get();
+ ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
+
+ auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100);
+ ASSERT_OK(limitStage->setAwaitDataTimeout(Milliseconds(789)));
+
+ auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
+ ASSERT_OK(awaitDataTimeout.getStatus());
+ ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue()));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 2e75954d434..527bc0f0063 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -64,4 +64,8 @@ bool RouterStageMerge::remotesExhausted() {
return _arm.remotesExhausted();
}
+Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return _arm.setAwaitDataTimeout(awaitDataTimeout);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index d84b7172e47..a146c66f346 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -51,6 +51,8 @@ public:
bool remotesExhausted() final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
private:
// Not owned here.
executor::TaskExecutor* _executor;
diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp
index 5af0d0d470a..179635bbb08 100644
--- a/src/mongo/s/query/router_stage_mock.cpp
+++ b/src/mongo/s/query/router_stage_mock.cpp
@@ -68,4 +68,17 @@ bool RouterStageMock::remotesExhausted() {
return _remotesExhausted;
}
+Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ _awaitDataTimeout = awaitDataTimeout;
+ return Status::OK();
+}
+
+StatusWith<Milliseconds> RouterStageMock::getAwaitDataTimeout() {
+ if (!_awaitDataTimeout) {
+ return Status(ErrorCodes::BadValue, "no awaitData timeout set");
+ }
+
+ return *_awaitDataTimeout;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index 7cb31ce9745..b83e2879096 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -49,6 +49,8 @@ public:
bool remotesExhausted() final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
/**
* Queues a BSONObj to be returned.
*/
@@ -70,9 +72,15 @@ public:
*/
void markRemotesExhausted();
+ /**
+ * Gets the timeout for awaitData, or an error if none was set.
+ */
+ StatusWith<Milliseconds> getAwaitDataTimeout();
+
private:
std::queue<StatusWith<boost::optional<BSONObj>>> _resultsQueue;
bool _remotesExhausted = false;
+ boost::optional<Milliseconds> _awaitDataTimeout;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
index 8708e603c7b..16e6f9407a4 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp
@@ -65,4 +65,8 @@ bool RouterStageRemoveSortKey::remotesExhausted() {
return getChildStage()->remotesExhausted();
}
+Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
index 3cdae152db7..6ef60012a4d 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ b/src/mongo/s/query/router_stage_remove_sortkey.h
@@ -46,6 +46,8 @@ public:
void kill() final;
bool remotesExhausted() final;
+
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
index 668aec8e978..255bcb3cba4 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
@@ -149,6 +149,19 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
ASSERT_TRUE(sortKeyStage->remotesExhausted());
}
+TEST(RouterStageRemoveSortKeyTest, ForwardsAwaitDataTimeout) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStagePtr = mockStage.get();
+ ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
+
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage));
+ ASSERT_OK(sortKeyStage->setAwaitDataTimeout(Milliseconds(789)));
+
+ auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
+ ASSERT_OK(awaitDataTimeout.getStatus());
+ ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue()));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index 5fc866b2db0..536c3d173a2 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -64,4 +64,8 @@ bool RouterStageSkip::remotesExhausted() {
return getChildStage()->remotesExhausted();
}
+Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index b29e7fb20bd..35994d31e3e 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -45,6 +45,8 @@ public:
bool remotesExhausted() final;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
private:
long long _skip;
diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp
index 6e03e2d3301..7aca1f600bd 100644
--- a/src/mongo/s/query/router_stage_skip_test.cpp
+++ b/src/mongo/s/query/router_stage_skip_test.cpp
@@ -200,6 +200,19 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) {
ASSERT_TRUE(skipStage->remotesExhausted());
}
+TEST(RouterStageSkipTest, ForwardsAwaitDataTimeout) {
+ auto mockStage = stdx::make_unique<RouterStageMock>();
+ auto mockStagePtr = mockStage.get();
+ ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
+
+ auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3);
+ ASSERT_OK(skipStage->setAwaitDataTimeout(Milliseconds(789)));
+
+ auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
+ ASSERT_OK(awaitDataTimeout.getStatus());
+ ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue()));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp
index 1f711ad861a..79f63fc463b 100644
--- a/src/mongo/s/strategy.cpp
+++ b/src/mongo/s/strategy.cpp
@@ -264,7 +264,8 @@ void Strategy::queryOp(OperationContext* txn, Request& request) {
QuerySpec qSpec((string)q.ns, q.query, q.fields, q.ntoskip, q.ntoreturn, q.queryOptions);
// Parse "$maxTimeMS".
- StatusWith<int> maxTimeMS = LiteParsedQuery::parseMaxTimeMSQuery(q.query);
+ StatusWith<int> maxTimeMS =
+ LiteParsedQuery::parseMaxTimeMS(q.query[LiteParsedQuery::queryOptionMaxTimeMS]);
uassert(17233, maxTimeMS.getStatus().reason(), maxTimeMS.isOK());
if (_isSystemIndexes(q.ns) && doShardedIndexQuery(txn, request, qSpec)) {
@@ -576,7 +577,8 @@ void Strategy::getMore(OperationContext* txn, Request& request) {
if (ntoreturn) {
batchSize = abs(ntoreturn);
}
- GetMoreRequest getMoreRequest(NamespaceString(ns), id, batchSize, boost::none, boost::none);
+ GetMoreRequest getMoreRequest(
+ NamespaceString(ns), id, batchSize, boost::none, boost::none, boost::none);
auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest);
if (cursorResponse == ErrorCodes::CursorNotFound) {
diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp
index 54ce09d3dc8..50eeb355be7 100644
--- a/src/mongo/shell/bench.cpp
+++ b/src/mongo/shell/bench.cpp
@@ -398,6 +398,7 @@ int runQueryWithReadCommands(DBClientBase* conn,
GetMoreRequest getMoreRequest(lpq->nss(),
cursorResponse.getCursorId(),
lpq->getBatchSize(),
+ boost::none, // maxTimeMS
boost::none, // term
boost::none); // lastKnownCommittedOpTime
BSONObj getMoreCommandResult;