diff options
author | David Storch <david.storch@10gen.com> | 2015-07-30 20:19:53 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2015-07-30 20:19:53 -0400 |
commit | 0549805e713327519702856deeb66b16cb727b99 (patch) | |
tree | 0c72ee4e7b46b914e7119c3843d0446a21cf11b3 /src/mongo | |
parent | 3feccd233307a8a4fc81e312d072046f8cf2f499 (diff) | |
download | mongo-0549805e713327519702856deeb66b16cb727b99.tar.gz |
SERVER-19355 add skip support to the new find/getMore path in mongos
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/client/remote_command_runner_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/exec/idhack.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/query/canonical_query.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/query/lite_parsed_query.cpp | 93 | ||||
-rw-r--r-- | src/mongo/db/query/lite_parsed_query.h | 43 | ||||
-rw-r--r-- | src/mongo/db/query/lite_parsed_query_test.cpp | 159 | ||||
-rw-r--r-- | src/mongo/db/query/planner_analysis.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/query/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 50 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_limit.h | 4 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_merge.h | 4 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_skip.cpp | 63 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_skip.h | 52 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_skip_test.cpp | 153 |
19 files changed, 552 insertions, 121 deletions
diff --git a/src/mongo/client/remote_command_runner_impl.cpp b/src/mongo/client/remote_command_runner_impl.cpp index 0a839b471cd..67e960b44bd 100644 --- a/src/mongo/client/remote_command_runner_impl.cpp +++ b/src/mongo/client/remote_command_runner_impl.cpp @@ -134,7 +134,7 @@ Status runDownconvertedFindCommand(DBClientConnection* conn, query.snapshot(); } int nToReturn = lpq->getLimit().value_or(0) * -1; - int nToSkip = lpq->getSkip(); + int nToSkip = lpq->getSkip().value_or(0); const BSONObj* fieldsToReturn = &lpq->getProj(); int queryOptions = lpq->getOptions(); int batchSize = lpq->getBatchSize().value_or(0); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index eac7ec1746e..a8c6f0c6a17 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -219,7 +219,7 @@ public: // Fill out curop information. long long ntoreturn = lpq->getBatchSize().value_or(0); - beginQueryOp(txn, nss, cmdObj, ntoreturn, lpq->getSkip()); + beginQueryOp(txn, nss, cmdObj, ntoreturn, lpq->getSkip().value_or(0)); // 1b) Finish the parsing step by using the LiteParsedQuery to create a CanonicalQuery. WhereCallbackReal whereCallback(txn, nss.db()); diff --git a/src/mongo/db/exec/idhack.cpp b/src/mongo/db/exec/idhack.cpp index afc7507a54f..a66ad20edda 100644 --- a/src/mongo/db/exec/idhack.cpp +++ b/src/mongo/db/exec/idhack.cpp @@ -242,7 +242,7 @@ void IDHackStage::doInvalidate(OperationContext* txn, const RecordId& dl, Invali // static bool IDHackStage::supportsQuery(const CanonicalQuery& query) { return !query.getParsed().showRecordId() && query.getParsed().getHint().isEmpty() && - 0 == query.getParsed().getSkip() && + !query.getParsed().getSkip() && CanonicalQuery::isSimpleIdQuery(query.getParsed().getFilter()) && !query.getParsed().isTailable(); } diff --git a/src/mongo/db/query/canonical_query.cpp b/src/mongo/db/query/canonical_query.cpp index b290ae74d24..7b780cccd6d 100644 --- a/src/mongo/db/query/canonical_query.cpp +++ b/src/mongo/db/query/canonical_query.cpp @@ -570,7 +570,9 @@ std::string CanonicalQuery::toString() const { ss << " limit=" << *_pq->getLimit(); } - ss << " skip=" << _pq->getSkip() << "\n"; + if (_pq->getSkip()) { + ss << " skip=" << *_pq->getSkip() << "\n"; + } // The expression tree puts an endl on for us. ss << "Tree: " << _root->toString(); @@ -582,7 +584,7 @@ std::string CanonicalQuery::toString() const { std::string CanonicalQuery::toStringShort() const { str::stream ss; ss << "query: " << _pq->getFilter().toString() << " sort: " << _pq->getSort().toString() - << " projection: " << _pq->getProj().toString() << " skip: " << _pq->getSkip(); + << " projection: " << _pq->getProj().toString(); if (_pq->getBatchSize()) { ss << " batchSize: " << *_pq->getBatchSize(); @@ -592,6 +594,10 @@ std::string CanonicalQuery::toStringShort() const { ss << " limit: " << *_pq->getLimit(); } + if (_pq->getSkip()) { + ss << " skip: " << *_pq->getSkip(); + } + return ss; } diff --git a/src/mongo/db/query/lite_parsed_query.cpp b/src/mongo/db/query/lite_parsed_query.cpp index 05aade1991e..b82ba259257 100644 --- a/src/mongo/db/query/lite_parsed_query.cpp +++ b/src/mongo/db/query/lite_parsed_query.cpp @@ -169,7 +169,10 @@ StatusWith<unique_ptr<LiteParsedQuery>> LiteParsedQuery::makeFromFindCommand(Nam return Status(ErrorCodes::BadValue, "skip value must be non-negative"); } - pq->_skip = skip; + // A skip value of 0 means that there is no skip. + if (skip) { + pq->_skip = skip; + } } else if (str::equals(fieldName, kLimitField)) { if (!el.isNumber()) { str::stream ss; @@ -179,11 +182,14 @@ StatusWith<unique_ptr<LiteParsedQuery>> LiteParsedQuery::makeFromFindCommand(Nam } long long limit = el.numberLong(); - if (limit <= 0) { - return Status(ErrorCodes::BadValue, "limit value must be positive"); + if (limit < 0) { + return Status(ErrorCodes::BadValue, "limit value must be non-negative"); } - pq->_limit = limit; + // A limit value of 0 means that there is no limit. + if (limit) { + pq->_limit = limit; + } } else if (str::equals(fieldName, kBatchSizeField)) { if (!el.isNumber()) { str::stream ss; @@ -391,31 +397,63 @@ StatusWith<unique_ptr<LiteParsedQuery>> LiteParsedQuery::makeAsOpQuery(Namespace } // static -StatusWith<unique_ptr<LiteParsedQuery>> LiteParsedQuery::makeAsFindCmd( +std::unique_ptr<LiteParsedQuery> LiteParsedQuery::makeAsFindCmd( NamespaceString nss, - const BSONObj& query, + const BSONObj& filter, + const BSONObj& projection, const BSONObj& sort, - boost::optional<long long> limit) { + const BSONObj& hint, + boost::optional<long long> skip, + boost::optional<long long> limit, + boost::optional<long long> batchSize, + bool wantMore, + bool isExplain, + const std::string& comment, + int maxScan, + int maxTimeMS, + const BSONObj& min, + const BSONObj& max, + bool returnKey, + bool showRecordId, + bool isSnapshot, + bool hasReadPref, + bool isTailable, + bool isSlaveOk, + bool isOplogReplay, + bool isNoCursorTimeout, + bool isAwaitData, + bool isPartial) { unique_ptr<LiteParsedQuery> pq(new LiteParsedQuery(std::move(nss))); - pq->_fromCommand = true; - pq->_filter = query.getOwned(); - pq->_sort = sort.getOwned(); - if (limit) { - if (*limit <= 0) { - return Status(ErrorCodes::BadValue, "limit value must be positive"); - } - - pq->_limit = std::move(limit); - } + pq->_filter = filter; + pq->_proj = projection; + pq->_sort = sort; + pq->_hint = hint; - pq->addMetaProjection(); + pq->_skip = skip; + pq->_limit = limit; + pq->_batchSize = batchSize; + pq->_wantMore = wantMore; - Status validateStatus = pq->validateFindCmd(); - if (!validateStatus.isOK()) { - return validateStatus; - } + pq->_explain = isExplain; + pq->_comment = comment; + pq->_maxScan = maxScan; + pq->_maxTimeMS = maxTimeMS; + + pq->_min = min; + pq->_max = max; + + pq->_returnKey = returnKey; + pq->_showRecordId = showRecordId; + pq->_snapshot = isSnapshot; + pq->_hasReadPref = hasReadPref; + pq->_tailable = isTailable; + pq->_slaveOk = isSlaveOk; + pq->_oplogReplay = isOplogReplay; + pq->_noCursorTimeout = isNoCursorTimeout; + pq->_awaitData = isAwaitData; + pq->_partial = isPartial; return std::move(pq); } @@ -441,8 +479,8 @@ BSONObj LiteParsedQuery::asFindCommand() const { bob.append(kHintField, _hint); } - if (_skip > 0) { - bob.append(kSkipField, _skip); + if (_skip) { + bob.append(kSkipField, *_skip); } if (_limit) { @@ -727,9 +765,12 @@ Status LiteParsedQuery::init(int ntoskip, const BSONObj& queryObj, const BSONObj& proj, bool fromQueryMessage) { - _skip = ntoskip; _proj = proj.getOwned(); + if (ntoskip) { + _skip = ntoskip; + } + if (ntoreturn) { _batchSize = ntoreturn; } @@ -737,7 +778,7 @@ Status LiteParsedQuery::init(int ntoskip, // Initialize flags passed as 'queryOptions' bit vector. initFromInt(queryOptions); - if (_skip < 0) { + if (_skip && *_skip < 0) { return Status(ErrorCodes::BadValue, "bad skip value in query"); } diff --git a/src/mongo/db/query/lite_parsed_query.h b/src/mongo/db/query/lite_parsed_query.h index 692279aab8b..144a50ae9e8 100644 --- a/src/mongo/db/query/lite_parsed_query.h +++ b/src/mongo/db/query/lite_parsed_query.h @@ -79,12 +79,35 @@ public: /** * Constructs a LiteParseQuery object that can be used to serialize to find command * BSON object. + * + * Input must be fully validated (e.g. if there is a limit value, it must be non-negative). */ - static StatusWith<std::unique_ptr<LiteParsedQuery>> makeAsFindCmd( + static std::unique_ptr<LiteParsedQuery> makeAsFindCmd( NamespaceString nss, - const BSONObj& query, - const BSONObj& sort, - boost::optional<long long> limit); + const BSONObj& filter = BSONObj(), + const BSONObj& projection = BSONObj(), + const BSONObj& sort = BSONObj(), + const BSONObj& hint = BSONObj(), + boost::optional<long long> skip = boost::none, + boost::optional<long long> limit = boost::none, + boost::optional<long long> batchSize = boost::none, + bool wantMore = true, + bool isExplain = false, + const std::string& comment = "", + int maxScan = 0, + int maxTimeMS = 0, + const BSONObj& min = BSONObj(), + const BSONObj& max = BSONObj(), + bool returnKey = false, + bool showRecordId = false, + bool isSnapshot = false, + bool hasReadPref = false, + bool isTailable = false, + bool isSlaveOk = false, + bool isOplogReplay = false, + bool isNoCursorTimeout = false, + bool isAwaitData = false, + bool isPartial = false); /** * Converts this LPQ into a find command. @@ -167,7 +190,7 @@ public: static const long long kDefaultBatchSize; - long long getSkip() const { + boost::optional<long long> getSkip() const { return _skip; } boost::optional<long long> getLimit() const { @@ -317,10 +340,18 @@ private: // {$hint: <String>}, where <String> is the index name hinted. BSONObj _hint; - long long _skip = 0; bool _wantMore = true; + // Must be either unset or positive. Negative skip is illegal and a skip of zero received from + // the client is interpreted as the absence of a skip value. + boost::optional<long long> _skip; + + // Must be either unset or positive. Negative limit is illegal and a limit value of zero + // received from the client is interpreted as the absence of a limit value. boost::optional<long long> _limit; + + // Must be either unset or non-negative. Negative batchSize is illegal but batchSize of 0 is + // allowed. boost::optional<long long> _batchSize; bool _fromCommand = false; diff --git a/src/mongo/db/query/lite_parsed_query_test.cpp b/src/mongo/db/query/lite_parsed_query_test.cpp index 0b3bf3d6855..238e8c7e6c8 100644 --- a/src/mongo/db/query/lite_parsed_query_test.cpp +++ b/src/mongo/db/query/lite_parsed_query_test.cpp @@ -255,31 +255,30 @@ TEST(LiteParsedQueryTest, ForbidMetaSortOnFieldWithoutMetaProject) { .getStatus()); } -TEST(LiteParsedQueryTest, MakeFindCmd) { - auto result = LiteParsedQuery::makeAsFindCmd( - NamespaceString("test.ns"), BSON("x" << 1), BSON("y" << -1), 2); - ASSERT_OK(result.getStatus()); +TEST(LiteParsedQueryTest, MakeAsFindCmdDefaultArgs) { + auto lpq = LiteParsedQuery::makeAsFindCmd(NamespaceString("test.ns")); + ASSERT_TRUE(lpq->isFromFindCommand()); - auto&& lpq = result.getValue(); ASSERT_EQUALS("test.ns", lpq->ns()); - ASSERT_EQUALS(BSON("x" << 1), lpq->getFilter()); - ASSERT_EQUALS(2, *lpq->getLimit()); + ASSERT_EQUALS(BSONObj(), lpq->getFilter()); ASSERT_EQUALS(BSONObj(), lpq->getProj()); - ASSERT_EQUALS(BSON("y" << -1), lpq->getSort()); + ASSERT_EQUALS(BSONObj(), lpq->getSort()); ASSERT_EQUALS(BSONObj(), lpq->getHint()); - ASSERT_EQUALS(BSONObj(), lpq->getMin()); - ASSERT_EQUALS(BSONObj(), lpq->getMax()); - ASSERT_EQUALS(0, lpq->getSkip()); + ASSERT_FALSE(lpq->getSkip()); + ASSERT_FALSE(lpq->getLimit()); + ASSERT_FALSE(lpq->getBatchSize()); + ASSERT_TRUE(lpq->wantMore()); + + ASSERT_FALSE(lpq->isExplain()); + ASSERT_EQ("", lpq->getComment()); ASSERT_EQUALS(0, lpq->getMaxScan()); ASSERT_EQUALS(0, lpq->getMaxTimeMS()); - ASSERT_EQUALS(0, lpq->getOptions()); - ASSERT_FALSE(lpq->getBatchSize()); + ASSERT_EQUALS(BSONObj(), lpq->getMin()); + ASSERT_EQUALS(BSONObj(), lpq->getMax()); - ASSERT_TRUE(lpq->isFromFindCommand()); - ASSERT_FALSE(lpq->isExplain()); ASSERT_FALSE(lpq->returnKey()); ASSERT_FALSE(lpq->showRecordId()); ASSERT_FALSE(lpq->isSnapshot()); @@ -289,61 +288,67 @@ TEST(LiteParsedQueryTest, MakeFindCmd) { ASSERT_FALSE(lpq->isOplogReplay()); ASSERT_FALSE(lpq->isNoCursorTimeout()); ASSERT_FALSE(lpq->isAwaitData()); - ASSERT_FALSE(lpq->isExhaust()); ASSERT_FALSE(lpq->isPartial()); } -TEST(LiteParsedQueryTest, MakeFindCmdNoLimit) { - auto result = LiteParsedQuery::makeAsFindCmd( - NamespaceString("test.ns"), BSON("x" << 1), BSONObj(), boost::none); - ASSERT_OK(result.getStatus()); +TEST(LiteParsedQueryTest, MakeFindCmdAllArgs) { + auto lpq = LiteParsedQuery::makeAsFindCmd(NamespaceString("test.ns"), + BSON("a" << 1), + BSON("b" << 1), + BSON("c" << 1), + BSON("d" << 1), + 4, + 5, + 6, + false, + true, + "this is a comment", + 7, + 8, + BSON("e" << 1), + BSON("f" << 1), + true, + true, + true, + true, + true, + true, + true, + true, + true, + true); + ASSERT_TRUE(lpq->isFromFindCommand()); - auto&& lpq = result.getValue(); ASSERT_EQUALS("test.ns", lpq->ns()); - ASSERT_EQUALS(BSON("x" << 1), lpq->getFilter()); - ASSERT_EQUALS(BSONObj(), lpq->getProj()); - ASSERT_EQUALS(BSONObj(), lpq->getSort()); - ASSERT_EQUALS(BSONObj(), lpq->getHint()); - ASSERT_EQUALS(BSONObj(), lpq->getMin()); - ASSERT_EQUALS(BSONObj(), lpq->getMax()); + ASSERT_EQUALS(BSON("a" << 1), lpq->getFilter()); + ASSERT_EQUALS(BSON("b" << 1), lpq->getProj()); + ASSERT_EQUALS(BSON("c" << 1), lpq->getSort()); + ASSERT_EQUALS(BSON("d" << 1), lpq->getHint()); - ASSERT_EQUALS(0, lpq->getSkip()); - ASSERT_EQUALS(0, lpq->getMaxScan()); - ASSERT_EQUALS(0, lpq->getMaxTimeMS()); - ASSERT_EQUALS(0, lpq->getOptions()); + ASSERT_EQ(4, *lpq->getSkip()); + ASSERT_EQ(5, *lpq->getLimit()); + ASSERT_EQ(6, *lpq->getBatchSize()); + ASSERT_FALSE(lpq->wantMore()); - ASSERT_FALSE(lpq->getBatchSize()); - ASSERT_FALSE(lpq->getLimit()); + ASSERT_TRUE(lpq->isExplain()); + ASSERT_EQ("this is a comment", lpq->getComment()); + ASSERT_EQUALS(7, lpq->getMaxScan()); + ASSERT_EQUALS(8, lpq->getMaxTimeMS()); - ASSERT_TRUE(lpq->isFromFindCommand()); - ASSERT_FALSE(lpq->isExplain()); - ASSERT_FALSE(lpq->returnKey()); - ASSERT_FALSE(lpq->showRecordId()); - ASSERT_FALSE(lpq->isSnapshot()); - ASSERT_FALSE(lpq->hasReadPref()); - ASSERT_FALSE(lpq->isTailable()); - ASSERT_FALSE(lpq->isSlaveOk()); - ASSERT_FALSE(lpq->isOplogReplay()); - ASSERT_FALSE(lpq->isNoCursorTimeout()); - ASSERT_FALSE(lpq->isAwaitData()); - ASSERT_FALSE(lpq->isExhaust()); - ASSERT_FALSE(lpq->isPartial()); -} - -TEST(LiteParsedQueryTest, MakeFindCmdBadLimit) { - Status status = LiteParsedQuery::makeAsFindCmd( - NamespaceString("test.ns"), BSON("x" << 1), BSONObj(), 0LL).getStatus(); - ASSERT_NOT_OK(status); - ASSERT_EQUALS(ErrorCodes::BadValue, status.code()); -} + ASSERT_EQUALS(BSON("e" << 1), lpq->getMin()); + ASSERT_EQUALS(BSON("f" << 1), lpq->getMax()); -TEST(LiteParsedQueryTest, MakeFindCmdLargeLimit) { - auto result = LiteParsedQuery::makeAsFindCmd( - NamespaceString("test.ns"), BSON("x" << 1), BSON("y" << -1), 8LL * 1000 * 1000 * 1000); - ASSERT_OK(result.getStatus()); - - ASSERT_EQUALS(8LL * 1000 * 1000 * 1000, *result.getValue()->getLimit()); + ASSERT_TRUE(lpq->returnKey()); + ASSERT_TRUE(lpq->showRecordId()); + ASSERT_TRUE(lpq->isSnapshot()); + ASSERT_TRUE(lpq->hasReadPref()); + ASSERT_TRUE(lpq->isTailable()); + ASSERT_TRUE(lpq->isSlaveOk()); + ASSERT_TRUE(lpq->isOplogReplay()); + ASSERT_TRUE(lpq->isNoCursorTimeout()); + ASSERT_TRUE(lpq->isAwaitData()); + ASSERT_TRUE(lpq->isPartial()); } // @@ -546,7 +551,7 @@ TEST(LiteParsedQueryTest, ParseFromCommandAllNonOptionFields) { BSONObj expectedHint = BSON("d" << 1); ASSERT_EQUALS(0, expectedHint.woCompare(lpq->getHint())); ASSERT_EQUALS(3, *lpq->getLimit()); - ASSERT_EQUALS(5, lpq->getSkip()); + ASSERT_EQUALS(5, *lpq->getSkip()); ASSERT_EQUALS(90, *lpq->getBatchSize()); ASSERT(lpq->wantMore()); } @@ -587,7 +592,7 @@ TEST(LiteParsedQueryTest, ParseFromCommandLargeSkip) { unique_ptr<LiteParsedQuery> lpq( assertGet(LiteParsedQuery::makeFromFindCommand(nss, cmdObj, isExplain))); - ASSERT_EQUALS(8LL * 1000 * 1000 * 1000, lpq->getSkip()); + ASSERT_EQUALS(8LL * 1000 * 1000 * 1000, *lpq->getSkip()); } // @@ -845,6 +850,19 @@ TEST(LiteParsedQueryTest, ParseFromCommandNegativeSkipError) { ASSERT_NOT_OK(result.getStatus()); } +TEST(LiteParsedQueryTest, ParseFromCommandSkipIsZero) { + BSONObj cmdObj = fromjson( + "{find: 'testns'," + "skip: 0," + "filter: {a: 3}}"); + const NamespaceString nss("test.testns"); + bool isExplain = false; + unique_ptr<LiteParsedQuery> lpq( + assertGet(LiteParsedQuery::makeFromFindCommand(nss, cmdObj, isExplain))); + ASSERT_EQ(BSON("a" << 3), lpq->getFilter()); + ASSERT_FALSE(lpq->getSkip()); +} + TEST(LiteParsedQueryTest, ParseFromCommandNegativeLimitError) { BSONObj cmdObj = fromjson( "{find: 'testns'," @@ -856,6 +874,19 @@ TEST(LiteParsedQueryTest, ParseFromCommandNegativeLimitError) { ASSERT_NOT_OK(result.getStatus()); } +TEST(LiteParsedQueryTest, ParseFromCommandLimitIsZero) { + BSONObj cmdObj = fromjson( + "{find: 'testns'," + "limit: 0," + "filter: {a: 3}}"); + const NamespaceString nss("test.testns"); + bool isExplain = false; + unique_ptr<LiteParsedQuery> lpq( + assertGet(LiteParsedQuery::makeFromFindCommand(nss, cmdObj, isExplain))); + ASSERT_EQ(BSON("a" << 3), lpq->getFilter()); + ASSERT_FALSE(lpq->getLimit()); +} + TEST(LiteParsedQueryTest, ParseFromCommandNegativeBatchSizeError) { BSONObj cmdObj = fromjson( "{find: 'testns'," @@ -1025,7 +1056,9 @@ TEST(LiteParsedQueryTest, DefaultQueryParametersCorrect) { std::unique_ptr<LiteParsedQuery> lpq( assertGet(LiteParsedQuery::makeFromFindCommand(nss, cmdObj, false))); - ASSERT_EQUALS(0, lpq->getSkip()); + ASSERT_FALSE(lpq->getSkip()); + ASSERT_FALSE(lpq->getLimit()); + ASSERT_EQUALS(true, lpq->wantMore()); ASSERT_EQUALS(true, lpq->isFromFindCommand()); ASSERT_EQUALS(false, lpq->isExplain()); diff --git a/src/mongo/db/query/planner_analysis.cpp b/src/mongo/db/query/planner_analysis.cpp index a7ab20ad54a..9f941d788a7 100644 --- a/src/mongo/db/query/planner_analysis.cpp +++ b/src/mongo/db/query/planner_analysis.cpp @@ -467,7 +467,8 @@ QuerySolutionNode* QueryPlannerAnalysis::analyzeSort(const CanonicalQuery& query // N + M items so that the skip stage can discard the first M results. if (lpq.getLimit()) { // We have a true limit. The limit can be combined with the SORT stage. - sort->limit = static_cast<size_t>(*lpq.getLimit()) + static_cast<size_t>(lpq.getSkip()); + sort->limit = + static_cast<size_t>(*lpq.getLimit()) + static_cast<size_t>(lpq.getSkip().value_or(0)); } else if (!lpq.isFromFindCommand() && lpq.getBatchSize()) { // We have an ntoreturn specified by an OP_QUERY style find. This is used // by clients to mean both batchSize and limit. @@ -475,7 +476,8 @@ QuerySolutionNode* QueryPlannerAnalysis::analyzeSort(const CanonicalQuery& query // Overflow here would be bad and could cause a nonsense limit. Cast // skip and limit values to unsigned ints to make sure that the // sum is never stored as signed. (See SERVER-13537). - sort->limit = static_cast<size_t>(*lpq.getBatchSize()) + static_cast<size_t>(lpq.getSkip()); + sort->limit = static_cast<size_t>(*lpq.getBatchSize()) + + static_cast<size_t>(lpq.getSkip().value_or(0)); // This is a SORT with a limit. The wire protocol has a single quantity // called "numToReturn" which could mean either limit or batchSize. @@ -715,9 +717,9 @@ QuerySolution* QueryPlannerAnalysis::analyzeDataAccess(const CanonicalQuery& que } } - if (0 != lpq.getSkip()) { + if (lpq.getSkip()) { SkipNode* skip = new SkipNode(); - skip->skip = lpq.getSkip(); + skip->skip = *lpq.getSkip(); skip->children.push_back(solnRoot); solnRoot = skip; } diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 5b6b497a8e7..f44b800a989 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -295,10 +295,15 @@ StatusWith<std::vector<BSONObj>> ShardRegistry::exhaustiveFind(const HostAndPort status = Status::OK(); }; - unique_ptr<LiteParsedQuery> findCmd( - fassertStatusOK(28688, LiteParsedQuery::makeAsFindCmd(nss, query, sort, limit))); - - QueryFetcher fetcher(_executor.get(), host, nss, findCmd->asFindCommand(), fetcherCallback); + auto lpq = LiteParsedQuery::makeAsFindCmd(nss, + query, + BSONObj(), // projection + sort, + BSONObj(), // hint + boost::none, // skip + limit); + + QueryFetcher fetcher(_executor.get(), host, nss, lpq->asFindCommand(), fetcherCallback); Status scheduleStatus = fetcher.schedule(); if (!scheduleStatus.isOK()) { diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 206522cd24f..3c605861de9 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -29,6 +29,7 @@ env.Library( "router_stage_limit.cpp", "router_stage_merge.cpp", "router_stage_mock.cpp", + "router_stage_skip.cpp", ], LIBDEPS=[ "async_results_merger", @@ -39,6 +40,7 @@ env.CppUnitTest( target="router_exec_stage_test", source=[ "router_stage_limit_test.cpp", + "router_stage_skip_test.cpp", ], LIBDEPS=[ "router_exec_stage", diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 74b146ef1da..ba77b95cc07 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -76,9 +76,7 @@ protected: params.projection = lpq->getProj(); params.limit = lpq->getLimit(); params.batchSize = lpq->getBatchSize(); - if (lpq->getSkip()) { - params.skip = lpq->getSkip(); - } + params.skip = lpq->getSkip(); arm = stdx::make_unique<AsyncResultsMerger>(executor, params, remotes); } diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index 10bacf3f546..59f1a6b014b 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -34,6 +34,7 @@ #include "mongo/s/query/router_stage_limit.h" #include "mongo/s/query/router_stage_merge.h" +#include "mongo/s/query/router_stage_skip.h" #include "mongo/stdx/memory.h" namespace mongo { @@ -59,6 +60,10 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( auto leaf = stdx::make_unique<RouterStageMerge>(executor, params, remotes); std::unique_ptr<RouterExecStage> root = std::move(leaf); + if (params.skip) { + root = stdx::make_unique<RouterStageSkip>(std::move(root), *params.skip); + } + if (params.limit) { root = stdx::make_unique<RouterStageLimit>(std::move(root), *params.limit); } diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 4bd51a81476..d87a995ddeb 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -47,6 +47,48 @@ namespace mongo { +namespace { + +/** + * Given the LiteParsedQuery 'lpq' being executed by mongos, returns a copy of the query which is + * suitable for forwarding to the targeted hosts. + */ +std::unique_ptr<LiteParsedQuery> transformQueryForShards(const LiteParsedQuery& lpq) { + // If there is a limit, we forward the sum of the limit and the skip. + boost::optional<long long> newLimit; + if (lpq.getLimit()) { + newLimit = *lpq.getLimit() + lpq.getSkip().value_or(0); + } + + return LiteParsedQuery::makeAsFindCmd(lpq.nss(), + lpq.getFilter(), + lpq.getProj(), + lpq.getSort(), + lpq.getHint(), + boost::none, // Don't forward skip. + newLimit, + lpq.getBatchSize(), + lpq.wantMore(), + lpq.isExplain(), + lpq.getComment(), + lpq.getMaxScan(), + lpq.getMaxTimeMS(), + lpq.getMin(), + lpq.getMax(), + lpq.returnKey(), + lpq.showRecordId(), + lpq.isSnapshot(), + lpq.hasReadPref(), + lpq.isTailable(), + lpq.isSlaveOk(), + lpq.isOplogReplay(), + lpq.isNoCursorTimeout(), + lpq.isAwaitData(), + lpq.isPartial()); +} + +} // namespace + StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, @@ -89,11 +131,13 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* txn, remotes.emplace_back(std::move(hostAndPort.getValue())); } - // TODO: handle other query options (skip and projection). + // TODO: handle other query options (projection). ClusterClientCursorParams params(query.nss()); - params.cmdObj = query.getParsed().asFindCommand(); params.sort = query.getParsed().getSort(); - params.limit = query.getParsed().getLimit(); + params.skip = query.getParsed().getSkip(); + + const auto lpqToForward = transformQueryForShards(query.getParsed()); + params.cmdObj = lpqToForward->asFindCommand(); ClusterClientCursorImpl ccc(shardRegistry->getExecutor(), params, remotes); diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index ed31be96974..1c252e7fac0 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -35,12 +35,10 @@ namespace mongo { /** * Passes through the first n results and then returns boost::none. */ -class RouterStageLimit : public RouterExecStage { +class RouterStageLimit final : public RouterExecStage { public: RouterStageLimit(std::unique_ptr<RouterExecStage> child, long long limit); - ~RouterStageLimit() final = default; - StatusWith<boost::optional<BSONObj>> next() final; void kill() final; diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index b9022b9a40f..986de199f14 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -41,14 +41,12 @@ namespace mongo { * merged documents manipulated by the MergerPlanStage pipeline. Used to present a stream of * documents merged from the shards to the stages later in the pipeline. */ -class RouterStageMerge : public RouterExecStage { +class RouterStageMerge final : public RouterExecStage { public: RouterStageMerge(executor::TaskExecutor* executor, const ClusterClientCursorParams& params, const std::vector<HostAndPort>& remotes); - ~RouterStageMerge() final = default; - StatusWith<boost::optional<BSONObj>> next() final; void kill() final; diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index 76f817a58da..1cefe624c52 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -38,7 +38,7 @@ namespace mongo { /** * Passes through the first n results and then returns boost::none. */ -class RouterStageMock : public RouterExecStage { +class RouterStageMock final : public RouterExecStage { public: ~RouterStageMock() final {} diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp new file mode 100644 index 00000000000..51a2abe06af --- /dev/null +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/router_stage_skip.h" + +namespace mongo { + +RouterStageSkip::RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip) + : RouterExecStage(std::move(child)), _skip(skip) { + invariant(skip > 0); +} + +StatusWith<boost::optional<BSONObj>> RouterStageSkip::next() { + while (_skippedSoFar < _skip) { + auto next = getChildStage()->next(); + if (!next.isOK()) { + return next; + } + + if (!next.getValue()) { + return next; + } + + ++_skippedSoFar; + } + + return getChildStage()->next(); +} + +void RouterStageSkip::kill() { + getChildStage()->kill(); +} + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h new file mode 100644 index 00000000000..6f78b31ce90 --- /dev/null +++ b/src/mongo/s/query/router_stage_skip.h @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/s/query/router_exec_stage.h" + +namespace mongo { + +/** + * Skips the first n results from the child and then passes through the remaining results. + */ +class RouterStageSkip final : public RouterExecStage { +public: + RouterStageSkip(std::unique_ptr<RouterExecStage> child, long long skip); + + StatusWith<boost::optional<BSONObj>> next() final; + + void kill() final; + +private: + long long _skip; + + long long _skippedSoFar = 0; +}; + +} // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip_test.cpp b/src/mongo/s/query/router_stage_skip_test.cpp new file mode 100644 index 00000000000..e0116136c2f --- /dev/null +++ b/src/mongo/s/query/router_stage_skip_test.cpp @@ -0,0 +1,153 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/query/router_stage_skip.h" + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/s/query/router_stage_mock.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +namespace { + +TEST(RouterStageSkipTest, SkipIsOne) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueResult(BSON("a" << 2)); + mockStage->queueResult(BSON("a" << 3)); + + auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 1); + + auto firstResult = skipStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue()); + ASSERT_EQ(*firstResult.getValue(), BSON("a" << 2)); + + auto secondResult = skipStage->next(); + ASSERT_OK(secondResult.getStatus()); + ASSERT(secondResult.getValue()); + ASSERT_EQ(*secondResult.getValue(), BSON("a" << 3)); + + // Once end-of-stream is reached, the skip stage should keep returning boost::none. + auto thirdResult = skipStage->next(); + ASSERT_OK(thirdResult.getStatus()); + ASSERT(!thirdResult.getValue()); + + auto fourthResult = skipStage->next(); + ASSERT_OK(thirdResult.getStatus()); + ASSERT(!thirdResult.getValue()); +} + +TEST(RouterStageSkipTest, SkipIsThree) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueResult(BSON("a" << 2)); + mockStage->queueResult(BSON("a" << 3)); + mockStage->queueResult(BSON("a" << 4)); + + auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3); + + auto firstResult = skipStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue()); + ASSERT_EQ(*firstResult.getValue(), BSON("a" << 4)); + + auto secondResult = skipStage->next(); + ASSERT_OK(secondResult.getStatus()); + ASSERT(!secondResult.getValue()); +} + +TEST(RouterStageSkipTest, SkipEqualToResultSetSize) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueResult(BSON("a" << 2)); + mockStage->queueResult(BSON("a" << 3)); + + auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 3); + + auto firstResult = skipStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(!firstResult.getValue()); +} + +TEST(RouterStageSkipTest, SkipExceedsResultSetSize) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueResult(BSON("a" << 2)); + mockStage->queueResult(BSON("a" << 3)); + + auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 100); + + auto firstResult = skipStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(!firstResult.getValue()); +} + +TEST(RouterStageSkipTest, ErrorWhileSkippingResults) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened")); + mockStage->queueResult(BSON("a" << 2)); + mockStage->queueResult(BSON("a" << 3)); + + auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2); + + auto firstResult = skipStage->next(); + ASSERT_NOT_OK(firstResult.getStatus()); + ASSERT_EQ(firstResult.getStatus(), ErrorCodes::BadValue); + ASSERT_EQ(firstResult.getStatus().reason(), "bad thing happened"); +} + +TEST(RouterStageSkipTest, ErrorAfterSkippingResults) { + auto mockStage = stdx::make_unique<RouterStageMock>(); + mockStage->queueResult(BSON("a" << 1)); + mockStage->queueResult(BSON("a" << 2)); + mockStage->queueResult(BSON("a" << 3)); + mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened")); + + auto skipStage = stdx::make_unique<RouterStageSkip>(std::move(mockStage), 2); + + auto firstResult = skipStage->next(); + ASSERT_OK(firstResult.getStatus()); + ASSERT(firstResult.getValue()); + ASSERT_EQ(*firstResult.getValue(), BSON("a" << 3)); + + auto secondResult = skipStage->next(); + ASSERT_NOT_OK(secondResult.getStatus()); + ASSERT_EQ(secondResult.getStatus(), ErrorCodes::BadValue); + ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened"); +} + +} // namespace + +} // namespace mongo |