diff options
author | David Storch <david.storch@10gen.com> | 2015-03-12 19:23:39 -0400 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2015-03-27 18:29:57 -0400 |
commit | 617f0bd0803ecb2d40c138d363149dc8b46d6345 (patch) | |
tree | 593e6049b26622b09dda887671cbc08ad39899c9 /src | |
parent | 4e2bd6f18b2fc9b27465e098331562e0f06fc40d (diff) | |
download | mongo-617f0bd0803ecb2d40c138d363149dc8b46d6345.tar.gz |
SERVER-17283 initial getMore command implementation
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 53 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 277 | ||||
-rw-r--r-- | src/mongo/db/query/SConscript | 11 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 109 | ||||
-rw-r--r-- | src/mongo/db/query/find.h | 43 | ||||
-rw-r--r-- | src/mongo/db/query/getmore_request.cpp | 117 | ||||
-rw-r--r-- | src/mongo/db/query/getmore_request.h | 73 |
8 files changed, 611 insertions, 73 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 660ae1c2522..6826423bced 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -734,6 +734,7 @@ serverOnlyFiles = [ "db/background.cpp", "db/commands/fsync.cpp", "db/commands/geo_near_cmd.cpp", "db/commands/get_last_error.cpp", + "db/commands/getmore_cmd.cpp", "db/commands/group.cpp", "db/commands/index_filter_commands.cpp", "db/commands/list_collections.cpp", diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 3d82789d910..731267970ff 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -32,6 +32,7 @@ #include <memory> +#include "mongo/base/disallow_copying.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" @@ -56,20 +57,21 @@ namespace mongo { * A command for running .find() queries. */ class FindCmd : public Command { + MONGO_DISALLOW_COPYING(FindCmd); public: FindCmd() : Command("find") { } - virtual bool isWriteCommandForConfigServer() const { return false; } + bool isWriteCommandForConfigServer() const override { return false; } - virtual bool slaveOk() const { return false; } + bool slaveOk() const override { return false; } - virtual bool slaveOverrideOk() const { return true; } + bool slaveOverrideOk() const override { return true; } - virtual bool maintenanceOk() const { return false; } + bool maintenanceOk() const override { return false; } - virtual bool adminOnly() const { return false; } + bool adminOnly() const override { return false; } - virtual void help(std::stringstream& help) const { + void help(std::stringstream& help) const override { help << "query for documents"; } @@ -77,11 +79,11 @@ namespace mongo { * A find command does not increment the command counter, but rather increments the * query counter. */ - bool shouldAffectCommandCounter() const { return false; } + bool shouldAffectCommandCounter() const override { return false; } - virtual Status checkAuthForCommand(ClientBasic* client, - const std::string& dbname, - const BSONObj& cmdObj) { + Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) override { AuthorizationSession* authzSession = client->getAuthorizationSession(); ResourcePattern pattern = parseResourcePattern(dbname, cmdObj); @@ -92,11 +94,11 @@ namespace mongo { return Status(ErrorCodes::Unauthorized, "unauthorized"); } - virtual Status explain(OperationContext* txn, - const std::string& dbname, - const BSONObj& cmdObj, - ExplainCommon::Verbosity verbosity, - BSONObjBuilder* out) const { + Status explain(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + ExplainCommon::Verbosity verbosity, + BSONObjBuilder* out) const override { const std::string fullns = parseNs(dbname, cmdObj); const NamespaceString nss(fullns); @@ -166,19 +168,26 @@ namespace mongo { * (i.e. call to shardingState.needCollectionMetadata() below), shard version * information should be passed as part of the command parameter. */ - virtual bool run(OperationContext* txn, - const std::string& dbname, - BSONObj& cmdObj, - int options, - std::string& errmsg, - BSONObjBuilder& result, - bool fromRepl) { + bool run(OperationContext* txn, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result, + bool fromRepl) override { const std::string fullns = parseNs(dbname, cmdObj); const NamespaceString nss(fullns); // Although it is a command, a find command gets counted as a query. globalOpCounters.gotQuery(); + if (txn->getClient()->isInDirectClient()) { + return appendCommandStatus(result, + Status(ErrorCodes::IllegalOperation, + "Cannot run find command from " + "inside DBDirectClient")); + } + // 1a) Parse the command BSON to a LiteParsedQuery. std::unique_ptr<LiteParsedQuery> lpq; { diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp new file mode 100644 index 00000000000..308146354dc --- /dev/null +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -0,0 +1,277 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include <memory> +#include <string> + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/audit.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/cursor_manager.h" +#include "mongo/db/clientcursor.h" +#include "mongo/db/commands.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/exec/working_set_common.h" +#include "mongo/db/global_environment_experiment.h" +#include "mongo/db/query/find.h" +#include "mongo/db/query/getmore_request.h" +#include "mongo/db/stats/counters.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + + /** + * A command for running getMore() against an existing cursor registered with a + * CursorManager. + * + * Can be used in combination with any cursor-generating command (e.g. find, aggregate, + * listIndexes). + */ + class GetMoreCmd : public Command { + MONGO_DISALLOW_COPYING(GetMoreCmd); + public: + GetMoreCmd() : Command("getMore") { } + + bool isWriteCommandForConfigServer() const override { return false; } + + bool slaveOk() const override { return false; } + + bool slaveOverrideOk() const override { return true; } + + bool maintenanceOk() const override { return false; } + + bool adminOnly() const override { return false; } + + void help(std::stringstream& help) const override { + help << "retrieve more results from an existing cursor"; + } + + /** + * A getMore command increments the getMore counter, not the command counter. + */ + bool shouldAffectCommandCounter() const override { return false; } + + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + return GetMoreRequest::parseNs(dbname, cmdObj); + } + + Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) override { + StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + const GetMoreRequest& request = parseStatus.getValue(); + + Status authzStatus = client->getAuthorizationSession()->checkAuthForGetMore( + request.nss, request.cursorid); + audit::logGetMoreAuthzCheck(client, request.nss, request.cursorid, authzStatus.code()); + + return authzStatus; + } + + /** + * Generates the next batch of results for a ClientCursor. + * + * TODO: Do we need to support some equivalent of OP_REPLY responseFlags? + * + * TODO: Is it possible to support awaitData? + */ + bool run(OperationContext* txn, + const std::string& dbname, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result, + bool fromRepl) override { + // Counted as a getMore, not as a command. + globalOpCounters.gotGetMore(); + + if (txn->getClient()->isInDirectClient()) { + return appendCommandStatus(result, + Status(ErrorCodes::IllegalOperation, + "Cannot run getMore command from " + "inside DBDirectClient")); + } + + StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj); + if (!parseStatus.isOK()) { + return appendCommandStatus(result, parseStatus.getStatus()); + } + const GetMoreRequest& request = parseStatus.getValue(); + + // Depending on the type of cursor being operated on, we hold locks for the whole + // getMore, or none of the getMore, or part of the getMore. The three cases in detail: + // + // 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore. + // 2) Cursor owned by global cursor manager: we don't lock anything. These cursors + // don't own any collection state. + // 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and + // "unpinCollLock". This is because agg cursors handle locking internally (hence the + // release), but the pin and unpin of the cursor must occur under the collection + // lock. We don't use our AutoGetCollectionForRead "ctx" to relock, because + // AutoGetCollectionForRead checks the sharding version (and we want the relock for + // the unpin to succeed even if the sharding version has changed). + // + // Note that we declare our locks before our ClientCursorPin, in order to ensure that + // the pin's destructor is called before the lock destructors (so that the unpin occurs + // under the lock). + std::unique_ptr<AutoGetCollectionForRead> ctx; + std::unique_ptr<Lock::DBLock> unpinDBLock; + std::unique_ptr<Lock::CollectionLock> unpinCollLock; + + CursorManager* cursorManager; + CursorManager* globalCursorManager = CursorManager::getGlobalCursorManager(); + if (globalCursorManager->ownsCursorId(request.cursorid)) { + cursorManager = globalCursorManager; + } + else { + ctx.reset(new AutoGetCollectionForRead(txn, request.nss)); + Collection* collection = ctx->getCollection(); + if (!collection) { + return appendCommandStatus(result, + Status(ErrorCodes::OperationFailed, + "collection dropped between getMore calls")); + } + cursorManager = collection->getCursorManager(); + } + + ClientCursorPin ccPin(cursorManager, request.cursorid); + ClientCursor* cursor = ccPin.c(); + if (!cursor) { + // We didn't find the cursor. + return appendCommandStatus(result, Status(ErrorCodes::CursorNotFound, str::stream() + << "Cursor not found, cursor id: " << request.cursorid)); + } + + if (request.nss.ns() != cursor->ns()) { + return appendCommandStatus(result, Status(ErrorCodes::Unauthorized, str::stream() + << "Requested getMore on namespace '" << request.nss.ns() + << "', but cursor belongs to a different namespace")); + } + + // On early return, get rid of the the cursor. + ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, ccPin); + + if (!cursor->hasRecoveryUnit()) { + // Start using a new RecoveryUnit. + cursor->setOwnedRecoveryUnit( + getGlobalEnvironment()->getGlobalStorageEngine()->newRecoveryUnit()); + } + + // Swap RecoveryUnit(s) between the ClientCursor and OperationContext. + ScopedRecoveryUnitSwapper ruSwapper(cursor, txn); + + // Reset timeout timer on the cursor since the cursor is still in use. + cursor->setIdleTime(0); + + // If the operation that spawned this cursor had a time limit set, apply leftover + // time to this getmore. + txn->getCurOp()->setMaxTimeMicros(cursor->getLeftoverMaxTimeMicros()); + txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. + + if (cursor->isAggCursor()) { + // Agg cursors handle their own locking internally. + ctx.reset(); // unlocks + } + + PlanExecutor* exec = cursor->getExecutor(); + exec->restoreState(txn); + + // TODO: Handle result sets larger than 16MB. + BSONArrayBuilder nextBatch; + BSONObj obj; + PlanExecutor::ExecState state; + int numResults = 0; + while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { + // Add result to output buffer. + nextBatch.append(obj); + numResults++; + + if (enoughForGetMore(request.batchSize, numResults, nextBatch.len())) { + break; + } + } + + // If we are operating on an aggregation cursor, then we dropped our collection lock + // earlier and need to reacquire it in order to clean up our ClientCursorPin. + // + // TODO: We need to ensure that this relock happens if we release the pin above in + // response to PlanExecutor::getNext() throwing an exception. + if (cursor->isAggCursor()) { + invariant(NULL == ctx.get()); + unpinDBLock.reset(new Lock::DBLock(txn->lockState(), request.nss.db(), MODE_IS)); + unpinCollLock.reset( + new Lock::CollectionLock(txn->lockState(), request.nss.ns(), MODE_IS)); + } + + // Fail the command if the PlanExecutor reports execution failure. + if (PlanExecutor::FAILURE == state) { + const std::unique_ptr<PlanStageStats> stats(exec->getStats()); + error() << "GetMore executor error, stats: " << Explain::statsToBSON(*stats); + return appendCommandStatus(result, + Status(ErrorCodes::OperationFailed, + str::stream() << "GetMore executor error: " + << WorkingSetCommon::toStatusString(obj))); + } + + CursorId respondWithId = 0; + if (shouldSaveCursorGetMore(state, exec, isCursorTailable(cursor))) { + respondWithId = request.cursorid; + + exec->saveState(); + + cursor->setLeftoverMaxTimeMicros(txn->getCurOp()->getRemainingMaxTimeMicros()); + cursor->incPos(numResults); + + if (isCursorTailable(cursor) && state == PlanExecutor::IS_EOF) { + // Rather than swapping their existing RU into the client cursor, tailable + // cursors should get a new recovery unit. + ruSwapper.dismiss(); + } + } + + Command::appendGetMoreResponseObject(respondWithId, request.nss.ns(), nextBatch.arr(), + &result); + if (respondWithId) { + cursorFreer.Dismiss(); + } + return true; + } + + } getMoreCmd; + +} // namespace mongo diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index bc9b62945e2..f8841b0e38b 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -21,6 +21,7 @@ env.Library( ], LIBDEPS=[ "explain_common", + "getmore_request", "index_bounds", "lite_parsed_query", "$BUILD_DIR/mongo/bson", @@ -95,6 +96,16 @@ env.Library( ) env.Library( + target="getmore_request", + source=[ + "getmore_request.cpp" + ], + LIBDEPS=[ + "$BUILD_DIR/mongo/bson", + ], +) + +env.Library( target="lite_parsed_query", source=[ "lite_parsed_query.cpp" diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index faf11902db9..6cd58d7f76e 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -72,6 +72,38 @@ namespace mongo { // Failpoint for checking whether we've received a getmore. MONGO_FP_DECLARE(failReceivedGetmore); + ScopedRecoveryUnitSwapper::ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn) + : _cc(cc), + _txn(txn), + _dismissed(false) { + // Save this for later. We restore it upon destruction. + _txn->recoveryUnit()->commitAndRestart(); + _txnPreviousRecoveryUnit.reset(txn->releaseRecoveryUnit()); + + // Transfer ownership of the RecoveryUnit from the ClientCursor to the OpCtx. + RecoveryUnit* ccRecoveryUnit = cc->releaseOwnedRecoveryUnit(); + txn->setRecoveryUnit(ccRecoveryUnit); + } + + void ScopedRecoveryUnitSwapper::dismiss() { + _dismissed = true; + } + + ScopedRecoveryUnitSwapper::~ScopedRecoveryUnitSwapper() { + _txn->recoveryUnit()->commitAndRestart(); + + if (_dismissed) { + // Just clean up the recovery unit which we originally got from the ClientCursor. + delete _txn->releaseRecoveryUnit(); + } + else { + // Swap the RU back into the ClientCursor for subsequent getMores. + _cc->setOwnedRecoveryUnit(_txn->releaseRecoveryUnit()); + } + + _txn->setRecoveryUnit(_txnPreviousRecoveryUnit.release()); + } + /** * If ntoreturn is zero, we stop generating additional results as soon as we have either 101 * documents or at least 1MB of data. On subsequent getmores, there is no limit on the number @@ -89,6 +121,15 @@ namespace mongo { return numDocs >= pq.getNumToReturn() || bytesBuffered > MaxBytesToReturnToClientAtOnce; } + bool enoughForGetMore(int ntoreturn, int numDocs, int bytesBuffered) { + return (ntoreturn && numDocs >= ntoreturn) + || (bytesBuffered > MaxBytesToReturnToClientAtOnce); + } + + bool isCursorTailable(const ClientCursor* cursor) { + return cursor->queryOptions() & QueryOption_CursorTailable; + } + bool shouldSaveCursor(OperationContext* txn, const Collection* collection, PlanExecutor::ExecState finalState, @@ -119,6 +160,20 @@ namespace mongo { return !exec->isEOF(); } + bool shouldSaveCursorGetMore(PlanExecutor::ExecState finalState, + PlanExecutor* exec, + bool isTailable) { + if (PlanExecutor::FAILURE == finalState || PlanExecutor::DEAD == finalState) { + return false; + } + + if (isTailable) { + return true; + } + + return !exec->isEOF(); + } + void beginQueryOp(const NamespaceString& nss, const BSONObj& queryObj, int ntoreturn, @@ -179,30 +234,6 @@ namespace mongo { } } - struct ScopedRecoveryUnitSwapper { - explicit ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn) - : _cc(cc), _txn(txn) { - - // Save this for later. We restore it upon destruction. - _txn->recoveryUnit()->commitAndRestart(); - _txnPreviousRecoveryUnit = txn->releaseRecoveryUnit(); - - // Transfer ownership of the RecoveryUnit from the ClientCursor to the OpCtx. - RecoveryUnit* ccRecoveryUnit = cc->releaseOwnedRecoveryUnit(); - txn->setRecoveryUnit(ccRecoveryUnit); - } - - ~ScopedRecoveryUnitSwapper() { - _txn->recoveryUnit()->commitAndRestart(); - _cc->setOwnedRecoveryUnit(_txn->releaseRecoveryUnit()); - _txn->setRecoveryUnit(_txnPreviousRecoveryUnit); - } - - ClientCursor* _cc; - OperationContext* _txn; - RecoveryUnit* _txnPreviousRecoveryUnit; - }; - /** * Called by db/instance.cpp. This is the getMore entry point. * @@ -377,23 +408,11 @@ namespace mongo { } } - if ((ntoreturn && numResults >= ntoreturn) - || bb.len() > MaxBytesToReturnToClientAtOnce) { + if (enoughForGetMore(ntoreturn, numResults, bb.len())) { break; } } - // We save the client cursor when there might be more results, and hence we may receive - // another getmore. If we receive a EOF or an error, or 'exec' is dead, then we know - // that we will not be producing more results. We indicate that the cursor is closed by - // sending a cursorId of 0 back to the client. - // - // On the other hand, if we retrieve all results necessary for this batch, then - // 'saveClientCursor' is true and we send a valid cursorId back to the client. In - // this case, there may or may not actually be more results (for example, the next call - // to getNext(...) might just return EOF). - bool saveClientCursor = false; - if (PlanExecutor::DEAD == state || PlanExecutor::FAILURE == state) { // Propagate this error to caller. if (PlanExecutor::FAILURE == state) { @@ -404,9 +423,6 @@ namespace mongo { WorkingSetCommon::toStatusString(obj)); } - // If we're dead there's no way to get more results. - saveClientCursor = false; - // In the old system tailable capped cursors would be killed off at the // cursorid level. If a tailable capped cursor is nuked the cursorid // would vanish. @@ -417,14 +433,6 @@ namespace mongo { resultFlags = ResultFlag_CursorNotFound; } } - else if (PlanExecutor::IS_EOF == state) { - // EOF is also end of the line unless it's tailable. - saveClientCursor = queryOptions & QueryOption_CursorTailable; - } - else { - verify(PlanExecutor::ADVANCED == state); - saveClientCursor = true; - } // If we are operating on an aggregation cursor, then we dropped our collection lock // earlier and need to reacquire it in order to clean up our ClientCursorPin. @@ -443,7 +451,7 @@ namespace mongo { // this case, the pin's destructor will be invoked, which will call release() on the // pin. Because our ClientCursorPin is declared after our lock is declared, this // will happen under the lock. - if (!saveClientCursor) { + if (!shouldSaveCursorGetMore(state, exec, isCursorTailable(cc))) { ruSwapper.reset(); ccPin.deleteUnderlying(); // cc is now invalid, as is the executor @@ -464,8 +472,7 @@ namespace mongo { if (PlanExecutor::IS_EOF == state && (queryOptions & QueryOption_CursorTailable)) { if (!txn->getClient()->isInDirectClient()) { // Don't stash the RU. Get a new one on the next getMore. - ruSwapper.reset(); - delete cc->releaseOwnedRecoveryUnit(); + ruSwapper->dismiss(); } if ((queryOptions & QueryOption_AwaitData) diff --git a/src/mongo/db/query/find.h b/src/mongo/db/query/find.h index 3bc177f011f..f62b89f5c0a 100644 --- a/src/mongo/db/query/find.h +++ b/src/mongo/db/query/find.h @@ -40,6 +40,26 @@ namespace mongo { class OperationContext; + class ScopedRecoveryUnitSwapper { + public: + ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn); + + ~ScopedRecoveryUnitSwapper(); + + /** + * Dismissing the RU swapper causes it to simply free the recovery unit rather than swapping + * it back into the ClientCursor. + */ + void dismiss(); + + private: + ClientCursor* _cc; + OperationContext* _txn; + bool _dismissed; + + std::unique_ptr<RecoveryUnit> _txnPreviousRecoveryUnit; + }; + /** * Returns true if enough results have been prepared to stop adding more to the first batch. * @@ -48,6 +68,18 @@ namespace mongo { bool enoughForFirstBatch(const LiteParsedQuery& pq, int numDocs, int bytesBuffered); /** + * Returns true if enough results have been prepared to stop adding more to a getMore batch. + * + * Should be called *after* adding to the result set rather than before. + */ + bool enoughForGetMore(int ntoreturn, int numDocs, int bytesBuffered); + + /** + * Whether or not the ClientCursor* is tailable. + */ + bool isCursorTailable(const ClientCursor* cursor); + + /** * Returns true if we should keep a cursor around because we're expecting to return more query * results. * @@ -60,6 +92,17 @@ namespace mongo { PlanExecutor* exec); /** + * Similar to shouldSaveCursor(), but used in getMore to determine whether we should keep + * the cursor around for additional getMores(). + * + * If false, the caller should close the cursor and indicate this to the client by sending back + * a cursor ID of 0. + */ + bool shouldSaveCursorGetMore(PlanExecutor::ExecState finalState, + PlanExecutor* exec, + bool isTailable); + + /** * Fills out CurOp with information about this query. */ void beginQueryOp(const NamespaceString& nss, diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp new file mode 100644 index 00000000000..e1e6d4d3d24 --- /dev/null +++ b/src/mongo/db/query/getmore_request.cpp @@ -0,0 +1,117 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/query/getmore_request.h" + +namespace mongo { + + const int GetMoreRequest::kDefaultBatchSize = 101; + + GetMoreRequest::GetMoreRequest() + : cursorid(0), + batchSize(0) { } + + GetMoreRequest::GetMoreRequest(const std::string& fullns, CursorId id, int sizeOfBatch) + : nss(fullns), + cursorid(id), + batchSize(sizeOfBatch) { } + + Status GetMoreRequest::isValid() const { + if (!nss.isValid()) { + return Status(ErrorCodes::BadValue, str::stream() + << "Invalid namespace for getMore: " << nss.ns()); + } + + if (cursorid == 0) { + return Status(ErrorCodes::BadValue, "Cursor id for getMore must be non-zero"); + } + + if (batchSize < 0) { + return Status(ErrorCodes::BadValue, str::stream() + << "Batch size for getMore must be non-negative, " + << "but received: " << batchSize); + } + + return Status::OK(); + } + + // static + std::string GetMoreRequest::parseNs(const std::string& dbname, const BSONObj& cmdObj) { + BSONElement collElt = cmdObj["collection"]; + const std::string coll = (collElt.type() == BSONType::String) ? collElt.String() + : ""; + + return str::stream() << dbname << "." << coll; + } + + // static + StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbname, + const BSONObj& cmdObj) { + if (!str::equals(cmdObj.firstElementFieldName(), "getMore")) { + return StatusWith<GetMoreRequest>(ErrorCodes::FailedToParse, str::stream() + << "First field name must be 'getMore' in: " << cmdObj); + } + + BSONElement cursorIdElt = cmdObj.firstElement(); + if (cursorIdElt.type() != BSONType::NumberLong) { + return StatusWith<GetMoreRequest>(ErrorCodes::TypeMismatch, str::stream() + << "Field 'getMore' must be of type long in: " << cmdObj); + } + const CursorId cursorid = cursorIdElt.Long(); + + BSONElement collElt = cmdObj["collection"]; + if (collElt.type() != BSONType::String) { + return StatusWith<GetMoreRequest>(ErrorCodes::TypeMismatch, str::stream() + << "Field 'collection' must be of type string in: " << cmdObj); + } + const std::string fullns = parseNs(dbname, cmdObj); + + int batchSize = kDefaultBatchSize; + BSONElement batchSizeElt = cmdObj["batchSize"]; + if (batchSizeElt.type() != BSONType::NumberInt && !batchSizeElt.eoo()) { + return StatusWith<GetMoreRequest>(ErrorCodes::TypeMismatch, str::stream() + << "Field 'batchSize' must be of type int in: " << cmdObj); + } + else if (!batchSizeElt.eoo()) { + batchSize = batchSizeElt.Int(); + } + + GetMoreRequest request(fullns, cursorid, batchSize); + Status validStatus = request.isValid(); + if (!validStatus.isOK()) { + return StatusWith<GetMoreRequest>(validStatus); + } + + return StatusWith<GetMoreRequest>(request); + } + +} // namespace mongo diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h new file mode 100644 index 00000000000..e7b340dc91a --- /dev/null +++ b/src/mongo/db/query/getmore_request.h @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/db/clientcursor.h" +#include "mongo/db/namespace_string.h" + +namespace mongo { + + struct GetMoreRequest { + /** + * Construct an empty request. + */ + GetMoreRequest(); + + /** + * Construct a GetMoreRequesst from the command specification and db name. + */ + static StatusWith<GetMoreRequest> parseFromBSON(const std::string& dbname, + const BSONObj& cmdObj); + + static std::string parseNs(const std::string& dbname, const BSONObj& cmdObj); + + const NamespaceString nss; + const CursorId cursorid; + const int batchSize; + + static const int kDefaultBatchSize; + + private: + /** + * Construct from parsed BSON + */ + GetMoreRequest(const std::string& fullns, CursorId id, int batch); + + /** + * Returns a non-OK status if there are semantic errors in the parsed request + * (e.g. a negative batchSize). + */ + Status isValid() const; + }; + +} // namespace mongo |