From 62172ae65d75c78e9e936784d0086c4b8133279b Mon Sep 17 00:00:00 2001 From: David Storch Date: Fri, 15 Jan 2016 13:16:53 -0500 Subject: SERVER-21797 unify cursor response commands to respond with up to 16MB of data Find, getMore, aggregate, listIndexes, and listCollections now will always return 16MB per batch, unless a batchSize limits the number of response documents. --- jstests/core/find9.js | 16 ++++++------ jstests/core/find_getmore_bsonsize.js | 36 +++++++++++++++++---------- jstests/core/single_batch.js | 12 ++++----- src/mongo/db/commands/find_cmd.cpp | 7 +++--- src/mongo/db/commands/getmore_cmd.cpp | 13 +++------- src/mongo/db/commands/list_collections.cpp | 11 +++++--- src/mongo/db/commands/list_indexes.cpp | 11 +++++--- src/mongo/db/commands/pipeline_command.cpp | 7 +++--- src/mongo/db/query/find.cpp | 21 +++++++++++----- src/mongo/db/query/find_common.cpp | 28 ++++++++++----------- src/mongo/db/query/find_common.h | 40 ++++++++++++++++++------------ src/mongo/s/query/cluster_find.cpp | 32 +++++++++--------------- 12 files changed, 127 insertions(+), 107 deletions(-) diff --git a/jstests/core/find9.js b/jstests/core/find9.js index 85adf93cc98..8c2b7ac282b 100644 --- a/jstests/core/find9.js +++ b/jstests/core/find9.js @@ -4,25 +4,25 @@ t = db.jstests_find9; t.drop(); big = new Array( 500000 ).toString(); -for( i = 0; i < 20; ++i ) { +for( i = 0; i < 60; ++i ) { t.save( { a:i, b:big } ); } // Check size limit with a simple query. -assert.eq( 20, t.find( {}, { a:1 } ).objsLeftInBatch() ); // Projection resizes the result set. -assert.gt( 20, t.find().objsLeftInBatch() ); +assert.eq( 60, t.find( {}, { a:1 } ).objsLeftInBatch() ); // Projection resizes the result set. +assert.gt( 60, t.find().objsLeftInBatch() ); // Check size limit on a query with an explicit batch size. -assert.eq( 20, t.find( {}, { a:1 } ).batchSize( 30 ).objsLeftInBatch() ); -assert.gt( 20, t.find().batchSize( 30 ).objsLeftInBatch() ); +assert.eq( 60, t.find( {}, { a:1 } ).batchSize( 80 ).objsLeftInBatch() ); +assert.gt( 60, t.find().batchSize( 80 ).objsLeftInBatch() ); -for( i = 0; i < 20; ++i ) { +for( i = 0; i < 60; ++i ) { t.save( { a:i, b:big } ); } // Check size limit with get more. -c = t.find().batchSize( 30 ); +c = t.find().batchSize( 80 ); while( c.hasNext() ) { - assert.gt( 20, c.objsLeftInBatch() ); + assert.gt( 60, c.objsLeftInBatch() ); c.next(); } diff --git a/jstests/core/find_getmore_bsonsize.js b/jstests/core/find_getmore_bsonsize.js index c77ea7ec1b0..fdad2b1f1d6 100644 --- a/jstests/core/find_getmore_bsonsize.js +++ b/jstests/core/find_getmore_bsonsize.js @@ -12,14 +12,7 @@ var oneKB = 1024; var oneMB = 1024 * oneKB; - function assertCursorExhausted(id) { - var cmdRes = db.runCommand({getMore: id, collection: collName}); - assert.eq(cmdRes.cursor.id, NumberLong(0)); - assert.eq(cmdRes.cursor.ns, coll.getFullName()); - assert.eq(cmdRes.cursor.nextBatch, []); - } - - // Build a 1 MB - 1 KB) string. + // Build a (1 MB - 1 KB) string. var smallStr = 'x'; while (smallStr.length < oneMB) { smallStr += smallStr; @@ -47,12 +40,12 @@ assert.eq(cmdRes.cursor.ns, coll.getFullName()); assert.eq(cmdRes.cursor.firstBatch.length, 1); - // The 16 MB doc should be returned alone on getMore. The following getMore will return nothing. + // The 16 MB doc should be returned alone on getMore. This is the last document in the + // collection, so the server should close the cursor. cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName}); - assert.eq(cmdRes.cursor.id, cmdRes.cursor.id); + assert.eq(cmdRes.cursor.id, NumberLong(0)); assert.eq(cmdRes.cursor.ns, coll.getFullName()); assert.eq(cmdRes.cursor.nextBatch.length, 1); - assertCursorExhausted(cmdRes.cursor.id); // Setup a cursor without returning any results (batchSize of zero). cmdRes = db.runCommand({find: collName, batchSize: 0}); @@ -69,8 +62,25 @@ // Second getMore should return the second doc and a third will close the cursor. cmdRes = db.runCommand({getMore: cmdRes.cursor.id, collection: collName}); - assert.eq(cmdRes.cursor.id, cmdRes.cursor.id); + assert.eq(cmdRes.cursor.id, NumberLong(0)); assert.eq(cmdRes.cursor.ns, coll.getFullName()); assert.eq(cmdRes.cursor.nextBatch.length, 1); - assertCursorExhausted(cmdRes.cursor.id); + + coll.drop(); + + // Insert a document of exactly 16MB and make sure the find command can return it. + bigStr = 'y'; + while (bigStr.length < (16 * oneMB)) { + bigStr += bigStr; + } + bigStr = bigStr.substring(0, (16 * oneMB) - 32); + var maxSizeDoc = {_id: 0, padding: bigStr}; + assert.eq(Object.bsonsize(maxSizeDoc), 16 * oneMB); + assert.writeOK(coll.insert(maxSizeDoc)); + + cmdRes = db.runCommand({find: collName}); + assert.commandWorked(cmdRes); + assert.eq(cmdRes.cursor.id, NumberLong(0)); + assert.eq(cmdRes.cursor.ns, coll.getFullName()); + assert.eq(cmdRes.cursor.firstBatch.length, 1); })(); diff --git a/jstests/core/single_batch.js b/jstests/core/single_batch.js index 012901b9768..ccf9f73362f 100644 --- a/jstests/core/single_batch.js +++ b/jstests/core/single_batch.js @@ -8,14 +8,14 @@ // Approximately 1 MB. var padding = new Array(1024 * 1024).join("x"); - // Insert ~10 MB of data. - for (var i = 0; i < 10; i++) { + // Insert ~20 MB of data. + for (var i = 0; i < 20; i++) { assert.writeOK(coll.insert({_id: i, padding: padding})); } - // The limit is 8, but we should end up with fewer documents since 8 docs won't fit in a single - // 4 MB batch. - var numResults = coll.find().limit(-8).itcount(); - assert.lt(numResults, 8); + // The limit is 18, but we should end up with fewer documents since 18 docs won't fit in a + // single 16 MB batch. + var numResults = coll.find().limit(-18).itcount(); + assert.lt(numResults, 18); assert.gt(numResults, 0); })(); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 6ab582e0365..9d0ae338312 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -302,11 +302,10 @@ public: BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; long long numResults = 0; - while (!FindCommon::enoughForFirstBatch(pq, numResults, firstBatch.bytesUsed()) && + while (!FindCommon::enoughForFirstBatch(pq, numResults) && PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { - // If adding this object will cause us to exceed the BSON size limit, then we stash - // it for later. - if (firstBatch.bytesUsed() + obj.objsize() > BSONObjMaxUserSize && numResults > 0) { + // If we can't fit this result inside the current batch, then we stash it for later. + if (!FindCommon::haveSpaceForNext(obj, numResults, firstBatch.bytesUsed())) { exec->enqueue(obj); break; } diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 6aa0f8c5112..555afad7cf2 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -426,11 +426,11 @@ public: // timeout to the user. BSONObj obj; try { - while (PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) { - // If adding this object will cause us to exceed the BSON size limit, then we + while (!FindCommon::enoughForGetMore(request.batchSize.value_or(0), *numResults) && + PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) { + // If adding this object will cause us to exceed the message size limit, then we // stash it for later. - if (nextBatch->bytesUsed() + obj.objsize() > BSONObjMaxUserSize && - *numResults > 0) { + if (!FindCommon::haveSpaceForNext(obj, *numResults, nextBatch->bytesUsed())) { exec->enqueue(obj); break; } @@ -438,11 +438,6 @@ public: // Add result to output buffer. nextBatch->append(obj); (*numResults)++; - - if (FindCommon::enoughForGetMore( - request.batchSize.value_or(0), *numResults, nextBatch->bytesUsed())) { - break; - } } } catch (const UserException& except) { if (isAwaitData && except.getCode() == ErrorCodes::ExceededTimeLimit) { diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index f467c14b3df..c33c246b2c1 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -243,15 +243,20 @@ public: BSONArrayBuilder firstBatch; - const int byteLimit = FindCommon::kMaxBytesToReturnToClientAtOnce; - for (long long objCount = 0; objCount < batchSize && firstBatch.len() < byteLimit; - objCount++) { + for (long long objCount = 0; objCount < batchSize; objCount++) { BSONObj next; PlanExecutor::ExecState state = exec->getNext(&next, NULL); if (state == PlanExecutor::IS_EOF) { break; } invariant(state == PlanExecutor::ADVANCED); + + // If we can't fit this result inside the current batch, then we stash it for later. + if (!FindCommon::haveSpaceForNext(next, objCount, firstBatch.len())) { + exec->enqueue(next); + break; + } + firstBatch.append(next); } diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 500a45634a8..98424e25bcd 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -194,15 +194,20 @@ public: BSONArrayBuilder firstBatch; - const int byteLimit = FindCommon::kMaxBytesToReturnToClientAtOnce; - for (long long objCount = 0; objCount < batchSize && firstBatch.len() < byteLimit; - objCount++) { + for (long long objCount = 0; objCount < batchSize; objCount++) { BSONObj next; PlanExecutor::ExecState state = exec->getNext(&next, NULL); if (state == PlanExecutor::IS_EOF) { break; } invariant(state == PlanExecutor::ADVANCED); + + // If we can't fit this result inside the current batch, then we stash it for later. + if (!FindCommon::haveSpaceForNext(next, objCount, firstBatch.len())) { + exec->enqueue(next); + break; + } + firstBatch.append(next); } diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index f8bbf2fc81c..d1dbfb582ea 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -86,7 +86,6 @@ static bool handleCursorCommand(OperationContext* txn, // can't use result BSONObjBuilder directly since it won't handle exceptions correctly. BSONArrayBuilder resultsArray; - const int byteLimit = FindCommon::kMaxBytesToReturnToClientAtOnce; BSONObj next; for (int objCount = 0; objCount < batchSize; objCount++) { // The initial getNext() on a PipelineProxyStage may be very expensive so we don't @@ -98,9 +97,9 @@ static bool handleCursorCommand(OperationContext* txn, break; } - // If adding this object will cause us to exceed the BSON size limit, then we stash it for - // later. - if (resultsArray.len() + next.objsize() > byteLimit) { + // If adding this object will cause us to exceed the message size limit, then we stash it + // for later. + if (!FindCommon::haveSpaceForNext(next, objCount, resultsArray.len())) { exec->enqueue(next); break; } diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index a56a282817a..b6450108372 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -213,7 +213,14 @@ void generateBatch(int ntoreturn, PlanExecutor* exec = cursor->getExecutor(); BSONObj obj; - while (PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) { + while (!FindCommon::enoughForGetMore(ntoreturn, *numResults) && + PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) { + // If we can't fit this result inside the current batch, then we stash it for later. + if (!FindCommon::haveSpaceForNext(obj, *numResults, bb->len())) { + exec->enqueue(obj); + break; + } + // Add result to output buffer. bb->appendBuf((void*)obj.objdata(), obj.objsize()); @@ -227,10 +234,6 @@ void generateBatch(int ntoreturn, *slaveReadTill = e.timestamp(); } } - - if (FindCommon::enoughForGetMore(ntoreturn, *numResults, bb->len())) { - break; - } } if (PlanExecutor::DEAD == *state || PlanExecutor::FAILURE == *state) { @@ -589,6 +592,12 @@ std::string runQuery(OperationContext* txn, } while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { + // If we can't fit this result inside the current batch, then we stash it for later. + if (!FindCommon::haveSpaceForNext(obj, numResults, bb.len())) { + exec->enqueue(obj); + break; + } + // Add result to output buffer. bb.appendBuf((void*)obj.objdata(), obj.objsize()); @@ -603,7 +612,7 @@ std::string runQuery(OperationContext* txn, } } - if (FindCommon::enoughForFirstBatch(pq, numResults, bb.len())) { + if (FindCommon::enoughForFirstBatch(pq, numResults)) { LOG(5) << "Enough for first batch, wantMore=" << pq.wantMore() << " ntoreturn=" << pq.getNToReturn().value_or(0) << " numResults=" << numResults << endl; diff --git a/src/mongo/db/query/find_common.cpp b/src/mongo/db/query/find_common.cpp index 54e652be8a6..d09ad8fd65f 100644 --- a/src/mongo/db/query/find_common.cpp +++ b/src/mongo/db/query/find_common.cpp @@ -38,26 +38,24 @@ namespace mongo { MONGO_FP_DECLARE(keepCursorPinnedDuringGetMore); -bool FindCommon::enoughForFirstBatch(const LiteParsedQuery& pq, - long long numDocs, - int bytesBuffered) { +bool FindCommon::enoughForFirstBatch(const LiteParsedQuery& pq, long long numDocs) { if (!pq.getEffectiveBatchSize()) { - // If there is no batch size, we stop generating additional results as soon as we have - // either 101 documents or at least 1MB of data. - return (bytesBuffered > 1024 * 1024) || numDocs >= LiteParsedQuery::kDefaultBatchSize; + // We enforce a default batch size for the initial find if no batch size is specified. + return numDocs >= LiteParsedQuery::kDefaultBatchSize; } - // If there is a batch size, we add results until either satisfying this batch size or exceeding - // the 4MB size threshold. - return numDocs >= pq.getEffectiveBatchSize().value() || - bytesBuffered > kMaxBytesToReturnToClientAtOnce; + return numDocs >= pq.getEffectiveBatchSize().value(); } -bool FindCommon::enoughForGetMore(long long effectiveBatchSize, - long long numDocs, - int bytesBuffered) { - return (effectiveBatchSize && numDocs >= effectiveBatchSize) || - (bytesBuffered > kMaxBytesToReturnToClientAtOnce); +bool FindCommon::haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, int bytesBuffered) { + invariant(numDocs >= 0); + if (!numDocs) { + // Allow the first output document to exceed the limit to ensure we can always make + // progress. + return true; + } + + return (bytesBuffered + nextDoc.objsize()) <= kMaxBytesToReturnToClientAtOnce; } BSONObj FindCommon::transformSortSpec(const BSONObj& sortSpec) { diff --git a/src/mongo/db/query/find_common.h b/src/mongo/db/query/find_common.h index 6029d511157..93483949ee7 100644 --- a/src/mongo/db/query/find_common.h +++ b/src/mongo/db/query/find_common.h @@ -26,6 +26,7 @@ * it in the license file. */ +#include "mongo/bson/bsonobj.h" #include "mongo/util/fail_point_service.h" namespace mongo { @@ -42,34 +43,41 @@ MONGO_FP_FORWARD_DECLARE(keepCursorPinnedDuringGetMore); */ class FindCommon { public: - // The size threshold at which we stop adding result documents to a getMore response or a find - // response that has a batchSize set (i.e. once the sum of the document sizes in bytes exceeds - // this value, no further documents are added). - static const int kMaxBytesToReturnToClientAtOnce = 4 * 1024 * 1024; + // The maximum amount of user data to return to a client in a single batch. + // + // This max may be exceeded by epsilon for output documents that approach the maximum user + // document size. That is, if we must return a BSONObjMaxUserSize document, then the total + // response size will be BSONObjMaxUserSize plus the amount of size required for the message + // header and the cursor response "envelope". (The envolope contains namespace and cursor id + // info.) + static const int kMaxBytesToReturnToClientAtOnce = BSONObjMaxUserSize; // The initial size of the query response buffer. static const int kInitReplyBufferSize = 32768; /** - * Returns true if enough results have been prepared to stop adding more to the first batch. + * Returns true if the batchSize for the initial find has been satisfied. * * If 'pq' does not have a batchSize, the default batchSize is respected. */ - static bool enoughForFirstBatch(const LiteParsedQuery& pq, - long long numDocs, - int bytesBuffered); + static bool enoughForFirstBatch(const LiteParsedQuery& pq, long long numDocs); /** - * Returns true if enough results have been prepared to stop adding more to a getMore batch. + * Returns true if the batchSize for the getMore has been satisfied. * - * An 'effectiveBatchSize' value of zero is interpreted as the absence of a batchSize; - * in this case, returns true only once the size threshold is exceeded. If 'effectiveBatchSize' - * is positive, returns true once either are added until we have either satisfied the batch size - * or exceeded the size threshold. + * An 'effectiveBatchSize' value of zero is interpreted as the absence of a batchSize, in which + * case this method returns false. */ - static bool enoughForGetMore(long long effectiveBatchSize, - long long numDocs, - int bytesBuffered); + static bool enoughForGetMore(long long effectiveBatchSize, long long numDocs) { + return effectiveBatchSize && numDocs >= effectiveBatchSize; + } + + /** + * Given the number of docs ('numDocs') and bytes ('bytesBuffered') currently buffered as a + * response to a cursor-generating command, returns true if there are enough remaining bytes in + * our budget to fit 'nextDoc'. + */ + static bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, int bytesBuffered); /** * Transforms the raw sort spec into one suitable for use as the ordering specification in diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 6b1df73fad1..40bbffc28dc 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -282,7 +282,7 @@ StatusWith runQueryWithoutRetrying(OperationContext* txn, auto cursorState = ClusterCursorManager::CursorState::NotExhausted; int bytesBuffered = 0; - while (!FindCommon::enoughForFirstBatch(query.getParsed(), results->size(), bytesBuffered)) { + while (!FindCommon::enoughForFirstBatch(query.getParsed(), results->size())) { auto next = ccc->next(); if (!next.isOK()) { return next.getStatus(); @@ -299,19 +299,16 @@ StatusWith runQueryWithoutRetrying(OperationContext* txn, break; } - // If adding this object will cause us to exceed the BSON size limit, then we stash it for - // later. By using BSONObjMaxUserSize, we ensure that there is enough room for the - // "envelope" (e.g. the "ns" and "id" fields included in the response) before exceeding - // BSONObjMaxInternalSize. - int sizeEstimate = bytesBuffered + next.getValue()->objsize() + - ((results->size() + 1U) * kPerDocumentOverheadBytesUpperBound); - if (sizeEstimate > BSONObjMaxUserSize && !results->empty()) { + // If adding this object will cause us to exceed the message size limit, then we stash it + // for later. + if (!FindCommon::haveSpaceForNext(*next.getValue(), results->size(), bytesBuffered)) { ccc->queueResult(*next.getValue()); break; } - // Add doc to the batch. - bytesBuffered += next.getValue()->objsize(); + // Add doc to the batch. Account for the space overhead associated with returning this doc + // inside a BSON array. + bytesBuffered += (next.getValue()->objsize() + kPerDocumentOverheadBytesUpperBound); results->push_back(std::move(*next.getValue())); } @@ -434,7 +431,7 @@ StatusWith ClusterFind::runGetMore(OperationContext* txn, long long batchSize = request.batchSize.value_or(0); long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar(); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; - while (!FindCommon::enoughForGetMore(batchSize, batch.size(), bytesBuffered)) { + while (!FindCommon::enoughForGetMore(batchSize, batch.size())) { auto next = pinnedCursor.getValue().next(); if (!next.isOK()) { return next.getStatus(); @@ -448,19 +445,14 @@ StatusWith ClusterFind::runGetMore(OperationContext* txn, break; } - // If adding this object will cause us to exceed the BSON size limit, then we stash it for - // later. By using BSONObjMaxUserSize, we ensure that there is enough room for the - // "envelope" (e.g. the "ns" and "id" fields included in the response) before exceeding - // BSONObjMaxInternalSize. - int sizeEstimate = bytesBuffered + next.getValue()->objsize() + - ((batch.size() + 1U) * kPerDocumentOverheadBytesUpperBound); - if (sizeEstimate > BSONObjMaxUserSize && !batch.empty()) { + if (!FindCommon::haveSpaceForNext(*next.getValue(), batch.size(), bytesBuffered)) { pinnedCursor.getValue().queueResult(*next.getValue()); break; } - // Add doc to the batch. - bytesBuffered += next.getValue()->objsize(); + // Add doc to the batch. Account for the space overhead associated with returning this doc + // inside a BSON array. + bytesBuffered += (next.getValue()->objsize() + kPerDocumentOverheadBytesUpperBound); batch.push_back(std::move(*next.getValue())); } -- cgit v1.2.1