diff options
author | David Storch <david.storch@10gen.com> | 2015-10-30 13:15:39 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2015-11-05 11:26:56 -0500 |
commit | 5b34689a84b969affc822e014e44816959ca460b (patch) | |
tree | 29f16d4988fbad4f5c62892fd253cbe90d02b3fa | |
parent | 5edafdbf6ca1effcb18d62c8e53b37544afecfcc (diff) | |
download | mongo-5b34689a84b969affc822e014e44816959ca460b.tar.gz |
SERVER-21218 make mongos forward maxTimeMS on getMore command to mongod
39 files changed, 417 insertions, 43 deletions
diff --git a/jstests/core/getmore_cmd_maxtimems.js b/jstests/core/getmore_cmd_maxtimems.js new file mode 100644 index 00000000000..a0c82667756 --- /dev/null +++ b/jstests/core/getmore_cmd_maxtimems.js @@ -0,0 +1,45 @@ +// Test attaching maxTimeMS to a getMore command. +(function() { + 'use strict'; + + var cmdRes; + var collName = 'getmore_cmd_maxtimems'; + var coll = db[collName]; + coll.drop(); + + for (var i = 0; i < 10; i++) { + assert.writeOK(coll.insert({a: i})); + } + + // Can't attach maxTimeMS to a getMore command for a non-tailable cursor over a non-capped + // collection. + cmdRes = db.runCommand({find: collName, batchSize: 2}); + assert.commandWorked(cmdRes); + cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 60000}); + assert.commandFailed(cmdRes); + + coll.drop(); + assert.commandWorked(db.createCollection(collName, {capped: true, size: 1024})); + for (var i = 0; i < 10; i++) { + assert.writeOK(coll.insert({a: i})); + } + + // Can't attach maxTimeMS to a getMore command for a non-tailable cursor over a capped + // collection. + cmdRes = db.runCommand({find: collName, batchSize: 2}); + assert.commandWorked(cmdRes); + cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 60000}); + assert.commandFailed(cmdRes); + + // Can't attach maxTimeMS to a getMore command for a non-awaitData tailable cursor. + cmdRes = db.runCommand({find: collName, batchSize: 2, tailable: true}); + assert.commandWorked(cmdRes); + cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 60000}); + assert.commandFailed(cmdRes); + + // Can attach maxTimeMS to a getMore command for an awaitData cursor. + cmdRes = db.runCommand({find: collName, batchSize: 2, tailable: true, awaitData: true}); + assert.commandWorked(cmdRes); + cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName, maxTimeMS: 60000}); + assert.commandWorked(cmdRes); +})(); diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index c2b00263a41..35c7a6b6bea 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -228,6 +228,12 @@ public: } } + if (request.awaitDataTimeout && !isCursorAwaitData(cursor)) { + Status status(ErrorCodes::BadValue, + "cannot set maxTimeMS on getMore command for a non-awaitData cursor"); + return appendCommandStatus(result, status); + } + // Validate term, if provided. if (request.term) { auto replCoord = repl::ReplicationCoordinator::get(txn); diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 18e821b49b1..d25d0a0a909 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -125,6 +125,7 @@ env.Library( '$BUILD_DIR/mongo/db/namespace_string', '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/rpc/command_status', + 'lite_parsed_query', ] ) diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp index 120def57a51..2606236813a 100644 --- a/src/mongo/db/query/getmore_request.cpp +++ b/src/mongo/db/query/getmore_request.cpp @@ -45,7 +45,7 @@ namespace { const char kCollectionField[] = "collection"; const char kBatchSizeField[] = "batchSize"; -const char kMaxTimeMSField[] = "maxTimeMS"; +const char kAwaitDataTimeoutField[] = "maxTimeMS"; const char kTermField[] = "term"; const char kLastKnownCommittedOpTimeField[] = "lastKnownCommittedOpTime"; @@ -58,11 +58,13 @@ GetMoreRequest::GetMoreRequest() : cursorid(0), batchSize(0) {} GetMoreRequest::GetMoreRequest(NamespaceString namespaceString, CursorId id, boost::optional<long long> sizeOfBatch, + boost::optional<Milliseconds> awaitDataTimeout, boost::optional<long long> term, boost::optional<repl::OpTime> lastKnownCommittedOpTime) : nss(std::move(namespaceString)), cursorid(id), batchSize(sizeOfBatch), + awaitDataTimeout(awaitDataTimeout), term(term), lastKnownCommittedOpTime(lastKnownCommittedOpTime) {} @@ -104,6 +106,7 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna // Optional fields. boost::optional<long long> batchSize; + boost::optional<Milliseconds> awaitDataTimeout; boost::optional<long long> term; boost::optional<repl::OpTime> lastKnownCommittedOpTime; @@ -131,10 +134,15 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna } batchSize = el.numberLong(); - } else if (str::equals(fieldName, kMaxTimeMSField)) { - // maxTimeMS is parsed by the command handling code, so we don't repeat the parsing - // here. - continue; + } else if (str::equals(fieldName, kAwaitDataTimeoutField)) { + auto maxAwaitDataTime = LiteParsedQuery::parseMaxTimeMS(el); + if (!maxAwaitDataTime.isOK()) { + return maxAwaitDataTime.getStatus(); + } + + if (maxAwaitDataTime.getValue()) { + awaitDataTimeout = Milliseconds(maxAwaitDataTime.getValue()); + } } else if (str::equals(fieldName, kTermField)) { if (el.type() != BSONType::NumberLong) { return {ErrorCodes::TypeMismatch, @@ -165,8 +173,12 @@ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbna str::stream() << "Field 'collection' missing in: " << cmdObj}; } - GetMoreRequest request( - NamespaceString(*fullns), *cursorid, batchSize, term, lastKnownCommittedOpTime); + GetMoreRequest request(NamespaceString(*fullns), + *cursorid, + batchSize, + awaitDataTimeout, + term, + lastKnownCommittedOpTime); Status validStatus = request.isValid(); if (!validStatus.isOK()) { return validStatus; @@ -185,6 +197,10 @@ BSONObj GetMoreRequest::toBSON() const { builder.append(kBatchSizeField, *batchSize); } + if (awaitDataTimeout) { + builder.append(kAwaitDataTimeoutField, durationCount<Milliseconds>(*awaitDataTimeout)); + } + if (term) { builder.append(kTermField, *term); } diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h index 56a1f2031b0..16455bfb055 100644 --- a/src/mongo/db/query/getmore_request.h +++ b/src/mongo/db/query/getmore_request.h @@ -35,6 +35,7 @@ #include "mongo/base/status_with.h" #include "mongo/db/clientcursor.h" #include "mongo/db/namespace_string.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -52,6 +53,7 @@ struct GetMoreRequest { GetMoreRequest(NamespaceString namespaceString, CursorId id, boost::optional<long long> sizeOfBatch, + boost::optional<Milliseconds> awaitDataTimeout, boost::optional<long long> term, boost::optional<repl::OpTime> lastKnownCommittedOpTime); @@ -76,6 +78,9 @@ struct GetMoreRequest { // as fit within the byte limit. const boost::optional<long long> batchSize; + // The number of milliseconds for which a getMore on a tailable, awaitData query should block. + const boost::optional<Milliseconds> awaitDataTimeout; + // Only internal queries from replication will typically have a term. const boost::optional<long long> term; diff --git a/src/mongo/db/query/getmore_request_test.cpp b/src/mongo/db/query/getmore_request_test.cpp index 0bd7e788033..997310c9645 100644 --- a/src/mongo/db/query/getmore_request_test.cpp +++ b/src/mongo/db/query/getmore_request_test.cpp @@ -175,7 +175,7 @@ TEST(GetMoreRequestTest, parseFromBSONIgnoreDollarPrefixedFields) { ASSERT_EQUALS(CursorId(123), result.getValue().cursorid); } -TEST(GetMoreRequestTest, parseFromBSONIgnoreMaxTimeMS) { +TEST(GetMoreRequestTest, parseFromBSONHasMaxTimeMS) { StatusWith<GetMoreRequest> result = GetMoreRequest::parseFromBSON("db", BSON("getMore" << CursorId(123) << "collection" @@ -183,11 +183,28 @@ TEST(GetMoreRequestTest, parseFromBSONIgnoreMaxTimeMS) { << "maxTimeMS" << 100)); ASSERT_OK(result.getStatus()); ASSERT_EQUALS("db.coll", result.getValue().nss.toString()); + ASSERT(result.getValue().awaitDataTimeout); + ASSERT_EQUALS(100, durationCount<Milliseconds>(*result.getValue().awaitDataTimeout)); ASSERT_EQUALS(CursorId(123), result.getValue().cursorid); } +TEST(GetMoreRequestTest, parseFromBSONHasMaxTimeMSOfZero) { + StatusWith<GetMoreRequest> result = + GetMoreRequest::parseFromBSON("db", + BSON("getMore" << CursorId(123) << "collection" + << "coll" + << "maxTimeMS" << 0)); + ASSERT_OK(result.getStatus()); + ASSERT_EQUALS("db.coll", result.getValue().nss.toString()); + ASSERT_EQUALS(CursorId(123), result.getValue().cursorid); + + // Max time of 0 means the same thing as no max time. + ASSERT(!result.getValue().awaitDataTimeout); +} + TEST(GetMoreRequestTest, toBSONHasBatchSize) { - GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, boost::none, boost::none); + GetMoreRequest request( + NamespaceString("testdb.testcoll"), 123, 99, boost::none, boost::none, boost::none); BSONObj requestObj = request.toBSON(); BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection" << "testcoll" @@ -196,8 +213,12 @@ TEST(GetMoreRequestTest, toBSONHasBatchSize) { } TEST(GetMoreRequestTest, toBSONMissingMatchSize) { - GetMoreRequest request( - NamespaceString("testdb.testcoll"), 123, boost::none, boost::none, boost::none); + GetMoreRequest request(NamespaceString("testdb.testcoll"), + 123, + boost::none, + boost::none, + boost::none, + boost::none); BSONObj requestObj = request.toBSON(); BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection" << "testcoll"); @@ -205,7 +226,8 @@ TEST(GetMoreRequestTest, toBSONMissingMatchSize) { } TEST(GetMoreRequestTest, toBSONHasTerm) { - GetMoreRequest request(NamespaceString("testdb.testcoll"), 123, 99, 1, boost::none); + GetMoreRequest request( + NamespaceString("testdb.testcoll"), 123, 99, boost::none, 1, boost::none); BSONObj requestObj = request.toBSON(); BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection" << "testcoll" @@ -214,8 +236,12 @@ TEST(GetMoreRequestTest, toBSONHasTerm) { } TEST(GetMoreRequestTest, toBSONHasCommitLevel) { - GetMoreRequest request( - NamespaceString("testdb.testcoll"), 123, 99, 1, repl::OpTime(Timestamp(0, 10), 2)); + GetMoreRequest request(NamespaceString("testdb.testcoll"), + 123, + 99, + boost::none, + 1, + repl::OpTime(Timestamp(0, 10), 2)); BSONObj requestObj = request.toBSON(); BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection" @@ -225,4 +251,18 @@ TEST(GetMoreRequestTest, toBSONHasCommitLevel) { ASSERT_EQ(requestObj, expectedRequest); } +TEST(GetMoreRequestTest, toBSONHasMaxTimeMS) { + GetMoreRequest request(NamespaceString("testdb.testcoll"), + 123, + boost::none, + Milliseconds(789), + boost::none, + boost::none); + BSONObj requestObj = request.toBSON(); + BSONObj expectedRequest = BSON("getMore" << CursorId(123) << "collection" + << "testcoll" + << "maxTimeMS" << 789); + ASSERT_EQ(requestObj, expectedRequest); +} + } // namespace diff --git a/src/mongo/db/query/lite_parsed_query.cpp b/src/mongo/db/query/lite_parsed_query.cpp index b86b0945363..a4f9696b7a0 100644 --- a/src/mongo/db/query/lite_parsed_query.cpp +++ b/src/mongo/db/query/lite_parsed_query.cpp @@ -666,17 +666,7 @@ Status LiteParsedQuery::validate() const { } // static -StatusWith<int> LiteParsedQuery::parseMaxTimeMSCommand(const BSONObj& cmdObj) { - return parseMaxTimeMS(cmdObj[cmdOptionMaxTimeMS]); -} - -// static -StatusWith<int> LiteParsedQuery::parseMaxTimeMSQuery(const BSONObj& queryObj) { - return parseMaxTimeMS(queryObj[queryOptionMaxTimeMS]); -} - -// static -StatusWith<int> LiteParsedQuery::parseMaxTimeMS(const BSONElement& maxTimeMSElt) { +StatusWith<int> LiteParsedQuery::parseMaxTimeMS(BSONElement maxTimeMSElt) { if (!maxTimeMSElt.eoo() && !maxTimeMSElt.isNumber()) { return StatusWith<int>( ErrorCodes::BadValue, diff --git a/src/mongo/db/query/lite_parsed_query.h b/src/mongo/db/query/lite_parsed_query.h index af139ef1a7d..0287ad159e6 100644 --- a/src/mongo/db/query/lite_parsed_query.h +++ b/src/mongo/db/query/lite_parsed_query.h @@ -119,16 +119,9 @@ public: void asFindCommand(BSONObjBuilder* cmdBuilder) const; /** - * Helper functions to parse maxTimeMS from a command object. Returns the contained value, - * or an error on parsing fail. When passed an EOO-type element, returns 0 (special value - * for "allow to run indefinitely"). + * Parses maxTimeMS from the BSONElement containing its value. */ - static StatusWith<int> parseMaxTimeMSCommand(const BSONObj& cmdObj); - - /** - * Same as parseMaxTimeMSCommand, but for a query object. - */ - static StatusWith<int> parseMaxTimeMSQuery(const BSONObj& queryObj); + static StatusWith<int> parseMaxTimeMS(BSONElement maxTimeMSElt); /** * Helper function to identify text search sort key @@ -298,8 +291,6 @@ public: static StatusWith<std::unique_ptr<LiteParsedQuery>> fromLegacyQueryMessage( const QueryMessage& qm); - static StatusWith<int> parseMaxTimeMS(const BSONElement& maxTimeMSElt); - private: LiteParsedQuery(NamespaceString nss); diff --git a/src/mongo/db/query/lite_parsed_query_test.cpp b/src/mongo/db/query/lite_parsed_query_test.cpp index d6c36a877c8..d2c1cd92842 100644 --- a/src/mongo/db/query/lite_parsed_query_test.cpp +++ b/src/mongo/db/query/lite_parsed_query_test.cpp @@ -1216,5 +1216,39 @@ TEST(LiteParsedQueryTest, ParseFromCommandForbidExtraOption) { ASSERT_NOT_OK(result.getStatus()); } +TEST(LiteParsedQueryTest, ParseMaxTimeMSStringValueFails) { + BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << "foo"); + ASSERT_NOT_OK(LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS])); +} + +TEST(LiteParsedQueryTest, ParseMaxTimeMSNonIntegralValueFails) { + BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << 100.3); + ASSERT_NOT_OK(LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS])); +} + +TEST(LiteParsedQueryTest, ParseMaxTimeMSOutOfRangeDoubleFails) { + BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << 1e200); + ASSERT_NOT_OK(LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS])); +} + +TEST(LiteParsedQueryTest, ParseMaxTimeMSNegativeValueFails) { + BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << -400); + ASSERT_NOT_OK(LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS])); +} + +TEST(LiteParsedQueryTest, ParseMaxTimeMSZeroSucceeds) { + BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << 0); + auto maxTime = LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS]); + ASSERT_OK(maxTime); + ASSERT_EQ(maxTime.getValue(), 0); +} + +TEST(LiteParsedQueryTest, ParseMaxTimeMSPositiveInRangeSucceeds) { + BSONObj maxTimeObj = BSON(LiteParsedQuery::cmdOptionMaxTimeMS << 300); + auto maxTime = LiteParsedQuery::parseMaxTimeMS(maxTimeObj[LiteParsedQuery::cmdOptionMaxTimeMS]); + ASSERT_OK(maxTime); + ASSERT_EQ(maxTime.getValue(), 300); +} + } // namespace mongo } // namespace diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index eaf0394350b..a4b1efc62c6 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -216,7 +216,8 @@ public: LOG(0) << "CMD: movechunk: " << cmdObj; - StatusWith<int> maxTimeMS = LiteParsedQuery::parseMaxTimeMSCommand(cmdObj); + StatusWith<int> maxTimeMS = + LiteParsedQuery::parseMaxTimeMS(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]); if (!maxTimeMS.isOK()) { errmsg = maxTimeMS.getStatus().reason(); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index cdada8c55f8..eaaf759156e 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -111,6 +111,18 @@ bool AsyncResultsMerger::remotesExhausted_inlock() { return true; } +Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (!_params.isTailable || !_params.isAwaitData) { + return Status(ErrorCodes::BadValue, + "maxTimeMS can only be used with getMore for tailable, awaitData cursors"); + } + + _awaitDataTimeout = awaitDataTimeout; + return Status::OK(); +} + bool AsyncResultsMerger::ready() { stdx::lock_guard<stdx::mutex> lk(_mutex); return ready_inlock(); @@ -268,10 +280,12 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { adjustedBatchSize = *_params.batchSize - remote.fetchedCount; } - cmdObj = - GetMoreRequest( - _params.nsString, *remote.cursorId, adjustedBatchSize, boost::none, boost::none) - .toBSON(); + cmdObj = GetMoreRequest(_params.nsString, + *remote.cursorId, + adjustedBatchSize, + _awaitDataTimeout, + boost::none, + boost::none).toBSON(); } else { // Do the first time shard host resolution. invariant(_params.readPreference); diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 9fc9b36960b..40e309e2273 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -40,6 +40,7 @@ #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -91,6 +92,15 @@ public: bool remotesExhausted(); /** + * Sets the maxTimeMS value that the ARM should forward with any internally issued getMore + * requests. + * + * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if + * the cursor is not tailable + awaitData). + */ + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout); + + /** * Returns true if there is no need to schedule remote work in order to take the next action. * This means that either * --there is a buffered result which we can return, @@ -352,6 +362,8 @@ private: // boost::none. bool _eofNext = false; + boost::optional<Milliseconds> _awaitDataTimeout; + // // Killing // diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 7dd3d6c2e2f..7886f92a713 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -117,6 +117,7 @@ protected: params.batchSize = getMoreBatchSize ? getMoreBatchSize : lpq->getBatchSize(); params.skip = lpq->getSkip(); params.isTailable = lpq->isTailable(); + params.isAwaitData = lpq->isAwaitData(); params.isAllowPartialResults = lpq->isAllowPartialResults(); for (const auto& shardId : shardIds) { @@ -1307,6 +1308,65 @@ TEST_F(AsyncResultsMergerTest, RetryOnHostUnreachableAllowPartialResults) { ASSERT_TRUE(arm->ready()); } +TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { + BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}"); + makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; + responses.emplace_back(_nss, CursorId(123), batch1); + scheduleNetworkResponses(std::move(responses), CursorResponse::ResponseType::InitialResponse); + executor->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->ready()); + ASSERT(!unittest::assertGet(arm->nextReady())); + + ASSERT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); + + ASSERT_FALSE(arm->ready()); + readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // Pending getMore request should include maxTimeMS. + BSONObj expectedCmdObj = BSON("getMore" << CursorId(123) << "collection" + << "testcoll" + << "maxTimeMS" << 789); + ASSERT_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj); + + responses.clear(); + std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; + responses.emplace_back(_nss, CursorId(0), batch2); + scheduleNetworkResponses(std::move(responses), + CursorResponse::ResponseType::SubsequentResponse); + executor->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); + ASSERT_TRUE(arm->ready()); + ASSERT(!unittest::assertGet(arm->nextReady())); +} + +TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { + BSONObj findCmd = fromjson("{find: 'testcoll'}"); + makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); + auto killEvent = arm->kill(); + executor->waitForEvent(killEvent); +} + +TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutAwaitDataCantHaveMaxTime) { + BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); + makeCursorFromFindCmd(findCmd, {kTestShardIds[0]}); + ASSERT_NOT_OK(arm->setAwaitDataTimeout(Milliseconds(789))); + auto killEvent = arm->kill(); + executor->waitForEvent(killEvent); +} } // namespace diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index d0e6d8cf811..55f2412f7c8 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -31,6 +31,7 @@ #include <boost/optional.hpp> #include "mongo/db/jsobj.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -95,6 +96,15 @@ public: * Returns whether or not all the remote cursors underlying this cursor have been exhausted. */ virtual bool remotesExhausted() = 0; + + /** + * Sets the maxTimeMS value that the cursor should forward with any internally issued getMore + * requests. + * + * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if + * the cursor is not tailable + awaitData). + */ + virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index bf6d301d0d2..a80562042dc 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -119,6 +119,10 @@ bool ClusterClientCursorImpl::remotesExhausted() { return _root->remotesExhausted(); } +Status ClusterClientCursorImpl::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return _root->setAwaitDataTimeout(awaitDataTimeout); +} + std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( executor::TaskExecutor* executor, ClusterClientCursorParams&& params) { const auto skip = params.skip; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 76c3f4c6708..1a8766f9005 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -108,6 +108,8 @@ public: bool remotesExhausted() final; + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + private: /** * Constructs a cluster client cursor. diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp index 96f5e2944a2..50b47f47817 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp @@ -129,6 +129,19 @@ TEST(ClusterClientCursorImpl, RemotesExhausted) { ASSERT_EQ(cursor.getNumReturnedSoFar(), 2LL); } +TEST(ClusterClientCursorImpl, ForwardsAwaitDataTimeout) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStagePtr = mockStage.get(); + ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); + + ClusterClientCursorImpl cursor(std::move(mockStage)); + ASSERT_OK(cursor.setAwaitDataTimeout(Milliseconds(789))); + + auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout(); + ASSERT_OK(awaitDataTimeout.getStatus()); + ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue())); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 97e999c6939..4cf9418fc7f 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -32,6 +32,8 @@ #include "mongo/s/query/cluster_client_cursor_mock.h" +#include "mongo/util/assert_util.h" + namespace mongo { ClusterClientCursorMock::ClusterClientCursorMock(stdx::function<void(void)> killCallback) @@ -91,4 +93,8 @@ void ClusterClientCursorMock::queueError(Status status) { _resultsQueue.push({status}); } +Status ClusterClientCursorMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + MONGO_UNREACHABLE; +} + } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index a8515ba61ce..67efae2181a 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -53,6 +53,8 @@ public: void queueResult(const BSONObj& obj) final; + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + /** * Returns true unless marked as having non-exhausted remote cursors via * markRemotesNotExhausted(). diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 0e497fd105a..5de1b912b72 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -132,6 +132,9 @@ struct ClusterClientCursorParams { // Whether this cursor is tailing a capped collection. bool isTailable = false; + // Whether this cursor has the awaitData option set. + bool isAwaitData = false; + // Read preference for where to target the query. This value is only set if initial shard host // targeting is necessary and not used if using externally prepared cursor ids. boost::optional<ReadPreferenceSetting> readPreference; diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 641bb29c30d..96ab2eb88dc 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -148,6 +148,11 @@ bool ClusterCursorManager::PinnedCursor::remotesExhausted() { return _cursor->remotesExhausted(); } +Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + invariant(_cursor); + return _cursor->setAwaitDataTimeout(awaitDataTimeout); +} + void ClusterCursorManager::PinnedCursor::returnAndKillCursor() { invariant(_cursor); diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index b26796628c2..7c04414f15c 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -189,6 +189,15 @@ public: */ bool remotesExhausted(); + /** + * Sets the maxTimeMS value that the cursor should forward with any internally issued + * getMore requests. A cursor must be owned. + * + * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. + * if the cursor is not tailable + awaitData). + */ + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout); + private: // ClusterCursorManager is a friend so that its methods can call the PinnedCursor // constructor declared below, which is private to prevent clients from calling it directly. diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 01562d8e835..34374af7a64 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -220,6 +220,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, params.batchSize = query.getParsed().getEffectiveBatchSize(); params.skip = query.getParsed().getSkip(); params.isTailable = query.getParsed().isTailable(); + params.isAwaitData = query.getParsed().isAwaitData(); params.isAllowPartialResults = query.getParsed().isAllowPartialResults(); // This is the batchSize passed to each subsequent getMore command issued by the cursor. We @@ -406,6 +407,13 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn, } invariant(request.cursorid == pinnedCursor.getValue().getCursorId()); + if (request.awaitDataTimeout) { + auto status = pinnedCursor.getValue().setAwaitDataTimeout(*request.awaitDataTimeout); + if (!status.isOK()) { + return status; + } + } + std::vector<BSONObj> batch; int bytesBuffered = 0; long long batchSize = request.batchSize.value_or(0); diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 68e12b7f1f6..deb4bf34f9f 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -33,6 +33,7 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj.h" +#include "mongo/util/time_support.h" namespace mongo { @@ -71,6 +72,15 @@ public: */ virtual bool remotesExhausted() = 0; + /** + * Sets the maxTimeMS value that the cursor should forward with any internally issued getMore + * requests. + * + * Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if + * the cursor is not tailable + awaitData). + */ + virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; + protected: /** * Returns an unowned pointer to the child stage, or nullptr if there is no child. diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp index c2f584f0358..9a9f77fbf00 100644 --- a/src/mongo/s/query/router_stage_limit.cpp +++ b/src/mongo/s/query/router_stage_limit.cpp @@ -63,4 +63,8 @@ bool RouterStageLimit::remotesExhausted() { return getChildStage()->remotesExhausted(); } +Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index 26ced69b24b..0db06c30c3b 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -45,6 +45,8 @@ public: bool remotesExhausted() final; + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + private: long long _limit; diff --git a/src/mongo/s/query/router_stage_limit_test.cpp b/src/mongo/s/query/router_stage_limit_test.cpp index b85d82bef2d..fd8fa335e7e 100644 --- a/src/mongo/s/query/router_stage_limit_test.cpp +++ b/src/mongo/s/query/router_stage_limit_test.cpp @@ -164,6 +164,19 @@ TEST(RouterStageLimitTest, LimitStageRemotesExhausted) { ASSERT_TRUE(limitStage->remotesExhausted()); } +TEST(RouterStageLimitTest, ForwardsAwaitDataTimeout) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStagePtr = mockStage.get(); + ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); + + auto limitStage = stdx::make_unique<RouterStageLimit>(std::move(mockStage), 100); + ASSERT_OK(limitStage->setAwaitDataTimeout(Milliseconds(789))); + + auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout(); + ASSERT_OK(awaitDataTimeout.getStatus()); + ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue())); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 2e75954d434..527bc0f0063 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -64,4 +64,8 @@ bool RouterStageMerge::remotesExhausted() { return _arm.remotesExhausted(); } +Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return _arm.setAwaitDataTimeout(awaitDataTimeout); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index d84b7172e47..a146c66f346 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -51,6 +51,8 @@ public: bool remotesExhausted() final; + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + private: // Not owned here. executor::TaskExecutor* _executor; diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp index 5af0d0d470a..179635bbb08 100644 --- a/src/mongo/s/query/router_stage_mock.cpp +++ b/src/mongo/s/query/router_stage_mock.cpp @@ -68,4 +68,17 @@ bool RouterStageMock::remotesExhausted() { return _remotesExhausted; } +Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + _awaitDataTimeout = awaitDataTimeout; + return Status::OK(); +} + +StatusWith<Milliseconds> RouterStageMock::getAwaitDataTimeout() { + if (!_awaitDataTimeout) { + return Status(ErrorCodes::BadValue, "no awaitData timeout set"); + } + + return *_awaitDataTimeout; +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index 7cb31ce9745..b83e2879096 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -49,6 +49,8 @@ public: bool remotesExhausted() final; + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + /** * Queues a BSONObj to be returned. */ @@ -70,9 +72,15 @@ public: */ void markRemotesExhausted(); + /** + * Gets the timeout for awaitData, or an error if none was set. + */ + StatusWith<Milliseconds> getAwaitDataTimeout(); + private: std::queue<StatusWith<boost::optional<BSONObj>>> _resultsQueue; bool _remotesExhausted = false; + boost::optional<Milliseconds> _awaitDataTimeout; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp index 8708e603c7b..16e6f9407a4 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp @@ -65,4 +65,8 @@ bool RouterStageRemoveSortKey::remotesExhausted() { return getChildStage()->remotesExhausted(); } +Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h index 3cdae152db7..6ef60012a4d 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.h +++ b/src/mongo/s/query/router_stage_remove_sortkey.h @@ -46,6 +46,8 @@ public: void kill() final; bool remotesExhausted() final; + + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp index 668aec8e978..255bcb3cba4 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey_test.cpp @@ -149,6 +149,19 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) { ASSERT_TRUE(sortKeyStage->remotesExhausted()); } +TEST(RouterStageRemoveSortKeyTest, ForwardsAwaitDataTimeout) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStagePtr = mockStage.get(); + ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); + + auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(std::move(mockStage)); + ASSERT_OK(sortKeyStage->setAwaitDataTimeout(Milliseconds(789))); + + auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout(); + ASSERT_OK(awaitDataTimeout.getStatus()); + ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue())); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp index 5fc866b2db0..536c3d173a2 100644 --- a/src/mongo/s/query/router_stage_skip.cpp +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -64,4 +64,8 @@ bool RouterStageSkip::remotesExhausted() { return getChildStage()->remotesExhausted(); } +Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { + return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h index b29e7fb20bd..35994d31e3e 100644 --- a/src/mongo/s/query/router_stage_skip.h +++ b/src/mongo/s/query/router_stage_skip.h @@ -45,6 +45,8 @@ public: bool remotesExhausted() final; + Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + private: long long _skip; diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp index 6e03e2d3301..7aca1f600bd 100644 --- a/src/mongo/s/query/router_stage_skip_test.cpp +++ b/src/mongo/s/query/router_stage_skip_test.cpp @@ -200,6 +200,19 @@ TEST(RouterStageSkipTest, SkipStageRemotesExhausted) { ASSERT_TRUE(skipStage->remotesExhausted()); } +TEST(RouterStageSkipTest, ForwardsAwaitDataTimeout) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + auto mockStagePtr = mockStage.get(); + ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus()); + + auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3); + ASSERT_OK(skipStage->setAwaitDataTimeout(Milliseconds(789))); + + auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout(); + ASSERT_OK(awaitDataTimeout.getStatus()); + ASSERT_EQ(789, durationCount<Milliseconds>(awaitDataTimeout.getValue())); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 1f711ad861a..79f63fc463b 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -264,7 +264,8 @@ void Strategy::queryOp(OperationContext* txn, Request& request) { QuerySpec qSpec((string)q.ns, q.query, q.fields, q.ntoskip, q.ntoreturn, q.queryOptions); // Parse "$maxTimeMS". - StatusWith<int> maxTimeMS = LiteParsedQuery::parseMaxTimeMSQuery(q.query); + StatusWith<int> maxTimeMS = + LiteParsedQuery::parseMaxTimeMS(q.query[LiteParsedQuery::queryOptionMaxTimeMS]); uassert(17233, maxTimeMS.getStatus().reason(), maxTimeMS.isOK()); if (_isSystemIndexes(q.ns) && doShardedIndexQuery(txn, request, qSpec)) { @@ -576,7 +577,8 @@ void Strategy::getMore(OperationContext* txn, Request& request) { if (ntoreturn) { batchSize = abs(ntoreturn); } - GetMoreRequest getMoreRequest(NamespaceString(ns), id, batchSize, boost::none, boost::none); + GetMoreRequest getMoreRequest( + NamespaceString(ns), id, batchSize, boost::none, boost::none, boost::none); auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest); if (cursorResponse == ErrorCodes::CursorNotFound) { diff --git a/src/mongo/shell/bench.cpp b/src/mongo/shell/bench.cpp index 54ce09d3dc8..50eeb355be7 100644 --- a/src/mongo/shell/bench.cpp +++ b/src/mongo/shell/bench.cpp @@ -398,6 +398,7 @@ int runQueryWithReadCommands(DBClientBase* conn, GetMoreRequest getMoreRequest(lpq->nss(), cursorResponse.getCursorId(), lpq->getBatchSize(), + boost::none, // maxTimeMS boost::none, // term boost::none); // lastKnownCommittedOpTime BSONObj getMoreCommandResult; |