summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2015-03-12 19:23:39 -0400
committerDavid Storch <david.storch@10gen.com>2015-03-27 18:29:57 -0400
commit617f0bd0803ecb2d40c138d363149dc8b46d6345 (patch)
tree593e6049b26622b09dda887671cbc08ad39899c9 /src
parent4e2bd6f18b2fc9b27465e098331562e0f06fc40d (diff)
downloadmongo-617f0bd0803ecb2d40c138d363149dc8b46d6345.tar.gz
SERVER-17283 initial getMore command implementation
Diffstat (limited to 'src')
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/commands/find_cmd.cpp53
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp277
-rw-r--r--src/mongo/db/query/SConscript11
-rw-r--r--src/mongo/db/query/find.cpp109
-rw-r--r--src/mongo/db/query/find.h43
-rw-r--r--src/mongo/db/query/getmore_request.cpp117
-rw-r--r--src/mongo/db/query/getmore_request.h73
8 files changed, 611 insertions, 73 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 660ae1c2522..6826423bced 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -734,6 +734,7 @@ serverOnlyFiles = [ "db/background.cpp",
"db/commands/fsync.cpp",
"db/commands/geo_near_cmd.cpp",
"db/commands/get_last_error.cpp",
+ "db/commands/getmore_cmd.cpp",
"db/commands/group.cpp",
"db/commands/index_filter_commands.cpp",
"db/commands/list_collections.cpp",
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 3d82789d910..731267970ff 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -32,6 +32,7 @@
#include <memory>
+#include "mongo/base/disallow_copying.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
@@ -56,20 +57,21 @@ namespace mongo {
* A command for running .find() queries.
*/
class FindCmd : public Command {
+ MONGO_DISALLOW_COPYING(FindCmd);
public:
FindCmd() : Command("find") { }
- virtual bool isWriteCommandForConfigServer() const { return false; }
+ bool isWriteCommandForConfigServer() const override { return false; }
- virtual bool slaveOk() const { return false; }
+ bool slaveOk() const override { return false; }
- virtual bool slaveOverrideOk() const { return true; }
+ bool slaveOverrideOk() const override { return true; }
- virtual bool maintenanceOk() const { return false; }
+ bool maintenanceOk() const override { return false; }
- virtual bool adminOnly() const { return false; }
+ bool adminOnly() const override { return false; }
- virtual void help(std::stringstream& help) const {
+ void help(std::stringstream& help) const override {
help << "query for documents";
}
@@ -77,11 +79,11 @@ namespace mongo {
* A find command does not increment the command counter, but rather increments the
* query counter.
*/
- bool shouldAffectCommandCounter() const { return false; }
+ bool shouldAffectCommandCounter() const override { return false; }
- virtual Status checkAuthForCommand(ClientBasic* client,
- const std::string& dbname,
- const BSONObj& cmdObj) {
+ Status checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
AuthorizationSession* authzSession = client->getAuthorizationSession();
ResourcePattern pattern = parseResourcePattern(dbname, cmdObj);
@@ -92,11 +94,11 @@ namespace mongo {
return Status(ErrorCodes::Unauthorized, "unauthorized");
}
- virtual Status explain(OperationContext* txn,
- const std::string& dbname,
- const BSONObj& cmdObj,
- ExplainCommon::Verbosity verbosity,
- BSONObjBuilder* out) const {
+ Status explain(OperationContext* txn,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ ExplainCommon::Verbosity verbosity,
+ BSONObjBuilder* out) const override {
const std::string fullns = parseNs(dbname, cmdObj);
const NamespaceString nss(fullns);
@@ -166,19 +168,26 @@ namespace mongo {
* (i.e. call to shardingState.needCollectionMetadata() below), shard version
* information should be passed as part of the command parameter.
*/
- virtual bool run(OperationContext* txn,
- const std::string& dbname,
- BSONObj& cmdObj,
- int options,
- std::string& errmsg,
- BSONObjBuilder& result,
- bool fromRepl) {
+ bool run(OperationContext* txn,
+ const std::string& dbname,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result,
+ bool fromRepl) override {
const std::string fullns = parseNs(dbname, cmdObj);
const NamespaceString nss(fullns);
// Although it is a command, a find command gets counted as a query.
globalOpCounters.gotQuery();
+ if (txn->getClient()->isInDirectClient()) {
+ return appendCommandStatus(result,
+ Status(ErrorCodes::IllegalOperation,
+ "Cannot run find command from "
+ "inside DBDirectClient"));
+ }
+
// 1a) Parse the command BSON to a LiteParsedQuery.
std::unique_ptr<LiteParsedQuery> lpq;
{
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
new file mode 100644
index 00000000000..308146354dc
--- /dev/null
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -0,0 +1,277 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+#include <string>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/audit.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/cursor_manager.h"
+#include "mongo/db/clientcursor.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/global_environment_experiment.h"
+#include "mongo/db/query/find.h"
+#include "mongo/db/query/getmore_request.h"
+#include "mongo/db/stats/counters.h"
+#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+ /**
+ * A command for running getMore() against an existing cursor registered with a
+ * CursorManager.
+ *
+ * Can be used in combination with any cursor-generating command (e.g. find, aggregate,
+ * listIndexes).
+ */
+ class GetMoreCmd : public Command {
+ MONGO_DISALLOW_COPYING(GetMoreCmd);
+ public:
+ GetMoreCmd() : Command("getMore") { }
+
+ bool isWriteCommandForConfigServer() const override { return false; }
+
+ bool slaveOk() const override { return false; }
+
+ bool slaveOverrideOk() const override { return true; }
+
+ bool maintenanceOk() const override { return false; }
+
+ bool adminOnly() const override { return false; }
+
+ void help(std::stringstream& help) const override {
+ help << "retrieve more results from an existing cursor";
+ }
+
+ /**
+ * A getMore command increments the getMore counter, not the command counter.
+ */
+ bool shouldAffectCommandCounter() const override { return false; }
+
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
+ return GetMoreRequest::parseNs(dbname, cmdObj);
+ }
+
+ Status checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
+ StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj);
+ if (!parseStatus.isOK()) {
+ return parseStatus.getStatus();
+ }
+
+ const GetMoreRequest& request = parseStatus.getValue();
+
+ Status authzStatus = client->getAuthorizationSession()->checkAuthForGetMore(
+ request.nss, request.cursorid);
+ audit::logGetMoreAuthzCheck(client, request.nss, request.cursorid, authzStatus.code());
+
+ return authzStatus;
+ }
+
+ /**
+ * Generates the next batch of results for a ClientCursor.
+ *
+ * TODO: Do we need to support some equivalent of OP_REPLY responseFlags?
+ *
+ * TODO: Is it possible to support awaitData?
+ */
+ bool run(OperationContext* txn,
+ const std::string& dbname,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result,
+ bool fromRepl) override {
+ // Counted as a getMore, not as a command.
+ globalOpCounters.gotGetMore();
+
+ if (txn->getClient()->isInDirectClient()) {
+ return appendCommandStatus(result,
+ Status(ErrorCodes::IllegalOperation,
+ "Cannot run getMore command from "
+ "inside DBDirectClient"));
+ }
+
+ StatusWith<GetMoreRequest> parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj);
+ if (!parseStatus.isOK()) {
+ return appendCommandStatus(result, parseStatus.getStatus());
+ }
+ const GetMoreRequest& request = parseStatus.getValue();
+
+ // Depending on the type of cursor being operated on, we hold locks for the whole
+ // getMore, or none of the getMore, or part of the getMore. The three cases in detail:
+ //
+ // 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore.
+ // 2) Cursor owned by global cursor manager: we don't lock anything. These cursors
+ // don't own any collection state.
+ // 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and
+ // "unpinCollLock". This is because agg cursors handle locking internally (hence the
+ // release), but the pin and unpin of the cursor must occur under the collection
+ // lock. We don't use our AutoGetCollectionForRead "ctx" to relock, because
+ // AutoGetCollectionForRead checks the sharding version (and we want the relock for
+ // the unpin to succeed even if the sharding version has changed).
+ //
+ // Note that we declare our locks before our ClientCursorPin, in order to ensure that
+ // the pin's destructor is called before the lock destructors (so that the unpin occurs
+ // under the lock).
+ std::unique_ptr<AutoGetCollectionForRead> ctx;
+ std::unique_ptr<Lock::DBLock> unpinDBLock;
+ std::unique_ptr<Lock::CollectionLock> unpinCollLock;
+
+ CursorManager* cursorManager;
+ CursorManager* globalCursorManager = CursorManager::getGlobalCursorManager();
+ if (globalCursorManager->ownsCursorId(request.cursorid)) {
+ cursorManager = globalCursorManager;
+ }
+ else {
+ ctx.reset(new AutoGetCollectionForRead(txn, request.nss));
+ Collection* collection = ctx->getCollection();
+ if (!collection) {
+ return appendCommandStatus(result,
+ Status(ErrorCodes::OperationFailed,
+ "collection dropped between getMore calls"));
+ }
+ cursorManager = collection->getCursorManager();
+ }
+
+ ClientCursorPin ccPin(cursorManager, request.cursorid);
+ ClientCursor* cursor = ccPin.c();
+ if (!cursor) {
+ // We didn't find the cursor.
+ return appendCommandStatus(result, Status(ErrorCodes::CursorNotFound, str::stream()
+ << "Cursor not found, cursor id: " << request.cursorid));
+ }
+
+ if (request.nss.ns() != cursor->ns()) {
+ return appendCommandStatus(result, Status(ErrorCodes::Unauthorized, str::stream()
+ << "Requested getMore on namespace '" << request.nss.ns()
+ << "', but cursor belongs to a different namespace"));
+ }
+
+ // On early return, get rid of the the cursor.
+ ScopeGuard cursorFreer = MakeGuard(&ClientCursorPin::deleteUnderlying, ccPin);
+
+ if (!cursor->hasRecoveryUnit()) {
+ // Start using a new RecoveryUnit.
+ cursor->setOwnedRecoveryUnit(
+ getGlobalEnvironment()->getGlobalStorageEngine()->newRecoveryUnit());
+ }
+
+ // Swap RecoveryUnit(s) between the ClientCursor and OperationContext.
+ ScopedRecoveryUnitSwapper ruSwapper(cursor, txn);
+
+ // Reset timeout timer on the cursor since the cursor is still in use.
+ cursor->setIdleTime(0);
+
+ // If the operation that spawned this cursor had a time limit set, apply leftover
+ // time to this getmore.
+ txn->getCurOp()->setMaxTimeMicros(cursor->getLeftoverMaxTimeMicros());
+ txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
+
+ if (cursor->isAggCursor()) {
+ // Agg cursors handle their own locking internally.
+ ctx.reset(); // unlocks
+ }
+
+ PlanExecutor* exec = cursor->getExecutor();
+ exec->restoreState(txn);
+
+ // TODO: Handle result sets larger than 16MB.
+ BSONArrayBuilder nextBatch;
+ BSONObj obj;
+ PlanExecutor::ExecState state;
+ int numResults = 0;
+ while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
+ // Add result to output buffer.
+ nextBatch.append(obj);
+ numResults++;
+
+ if (enoughForGetMore(request.batchSize, numResults, nextBatch.len())) {
+ break;
+ }
+ }
+
+ // If we are operating on an aggregation cursor, then we dropped our collection lock
+ // earlier and need to reacquire it in order to clean up our ClientCursorPin.
+ //
+ // TODO: We need to ensure that this relock happens if we release the pin above in
+ // response to PlanExecutor::getNext() throwing an exception.
+ if (cursor->isAggCursor()) {
+ invariant(NULL == ctx.get());
+ unpinDBLock.reset(new Lock::DBLock(txn->lockState(), request.nss.db(), MODE_IS));
+ unpinCollLock.reset(
+ new Lock::CollectionLock(txn->lockState(), request.nss.ns(), MODE_IS));
+ }
+
+ // Fail the command if the PlanExecutor reports execution failure.
+ if (PlanExecutor::FAILURE == state) {
+ const std::unique_ptr<PlanStageStats> stats(exec->getStats());
+ error() << "GetMore executor error, stats: " << Explain::statsToBSON(*stats);
+ return appendCommandStatus(result,
+ Status(ErrorCodes::OperationFailed,
+ str::stream() << "GetMore executor error: "
+ << WorkingSetCommon::toStatusString(obj)));
+ }
+
+ CursorId respondWithId = 0;
+ if (shouldSaveCursorGetMore(state, exec, isCursorTailable(cursor))) {
+ respondWithId = request.cursorid;
+
+ exec->saveState();
+
+ cursor->setLeftoverMaxTimeMicros(txn->getCurOp()->getRemainingMaxTimeMicros());
+ cursor->incPos(numResults);
+
+ if (isCursorTailable(cursor) && state == PlanExecutor::IS_EOF) {
+ // Rather than swapping their existing RU into the client cursor, tailable
+ // cursors should get a new recovery unit.
+ ruSwapper.dismiss();
+ }
+ }
+
+ Command::appendGetMoreResponseObject(respondWithId, request.nss.ns(), nextBatch.arr(),
+ &result);
+ if (respondWithId) {
+ cursorFreer.Dismiss();
+ }
+ return true;
+ }
+
+ } getMoreCmd;
+
+} // namespace mongo
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index bc9b62945e2..f8841b0e38b 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -21,6 +21,7 @@ env.Library(
],
LIBDEPS=[
"explain_common",
+ "getmore_request",
"index_bounds",
"lite_parsed_query",
"$BUILD_DIR/mongo/bson",
@@ -95,6 +96,16 @@ env.Library(
)
env.Library(
+ target="getmore_request",
+ source=[
+ "getmore_request.cpp"
+ ],
+ LIBDEPS=[
+ "$BUILD_DIR/mongo/bson",
+ ],
+)
+
+env.Library(
target="lite_parsed_query",
source=[
"lite_parsed_query.cpp"
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index faf11902db9..6cd58d7f76e 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -72,6 +72,38 @@ namespace mongo {
// Failpoint for checking whether we've received a getmore.
MONGO_FP_DECLARE(failReceivedGetmore);
+ ScopedRecoveryUnitSwapper::ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn)
+ : _cc(cc),
+ _txn(txn),
+ _dismissed(false) {
+ // Save this for later. We restore it upon destruction.
+ _txn->recoveryUnit()->commitAndRestart();
+ _txnPreviousRecoveryUnit.reset(txn->releaseRecoveryUnit());
+
+ // Transfer ownership of the RecoveryUnit from the ClientCursor to the OpCtx.
+ RecoveryUnit* ccRecoveryUnit = cc->releaseOwnedRecoveryUnit();
+ txn->setRecoveryUnit(ccRecoveryUnit);
+ }
+
+ void ScopedRecoveryUnitSwapper::dismiss() {
+ _dismissed = true;
+ }
+
+ ScopedRecoveryUnitSwapper::~ScopedRecoveryUnitSwapper() {
+ _txn->recoveryUnit()->commitAndRestart();
+
+ if (_dismissed) {
+ // Just clean up the recovery unit which we originally got from the ClientCursor.
+ delete _txn->releaseRecoveryUnit();
+ }
+ else {
+ // Swap the RU back into the ClientCursor for subsequent getMores.
+ _cc->setOwnedRecoveryUnit(_txn->releaseRecoveryUnit());
+ }
+
+ _txn->setRecoveryUnit(_txnPreviousRecoveryUnit.release());
+ }
+
/**
* If ntoreturn is zero, we stop generating additional results as soon as we have either 101
* documents or at least 1MB of data. On subsequent getmores, there is no limit on the number
@@ -89,6 +121,15 @@ namespace mongo {
return numDocs >= pq.getNumToReturn() || bytesBuffered > MaxBytesToReturnToClientAtOnce;
}
+ bool enoughForGetMore(int ntoreturn, int numDocs, int bytesBuffered) {
+ return (ntoreturn && numDocs >= ntoreturn)
+ || (bytesBuffered > MaxBytesToReturnToClientAtOnce);
+ }
+
+ bool isCursorTailable(const ClientCursor* cursor) {
+ return cursor->queryOptions() & QueryOption_CursorTailable;
+ }
+
bool shouldSaveCursor(OperationContext* txn,
const Collection* collection,
PlanExecutor::ExecState finalState,
@@ -119,6 +160,20 @@ namespace mongo {
return !exec->isEOF();
}
+ bool shouldSaveCursorGetMore(PlanExecutor::ExecState finalState,
+ PlanExecutor* exec,
+ bool isTailable) {
+ if (PlanExecutor::FAILURE == finalState || PlanExecutor::DEAD == finalState) {
+ return false;
+ }
+
+ if (isTailable) {
+ return true;
+ }
+
+ return !exec->isEOF();
+ }
+
void beginQueryOp(const NamespaceString& nss,
const BSONObj& queryObj,
int ntoreturn,
@@ -179,30 +234,6 @@ namespace mongo {
}
}
- struct ScopedRecoveryUnitSwapper {
- explicit ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn)
- : _cc(cc), _txn(txn) {
-
- // Save this for later. We restore it upon destruction.
- _txn->recoveryUnit()->commitAndRestart();
- _txnPreviousRecoveryUnit = txn->releaseRecoveryUnit();
-
- // Transfer ownership of the RecoveryUnit from the ClientCursor to the OpCtx.
- RecoveryUnit* ccRecoveryUnit = cc->releaseOwnedRecoveryUnit();
- txn->setRecoveryUnit(ccRecoveryUnit);
- }
-
- ~ScopedRecoveryUnitSwapper() {
- _txn->recoveryUnit()->commitAndRestart();
- _cc->setOwnedRecoveryUnit(_txn->releaseRecoveryUnit());
- _txn->setRecoveryUnit(_txnPreviousRecoveryUnit);
- }
-
- ClientCursor* _cc;
- OperationContext* _txn;
- RecoveryUnit* _txnPreviousRecoveryUnit;
- };
-
/**
* Called by db/instance.cpp. This is the getMore entry point.
*
@@ -377,23 +408,11 @@ namespace mongo {
}
}
- if ((ntoreturn && numResults >= ntoreturn)
- || bb.len() > MaxBytesToReturnToClientAtOnce) {
+ if (enoughForGetMore(ntoreturn, numResults, bb.len())) {
break;
}
}
- // We save the client cursor when there might be more results, and hence we may receive
- // another getmore. If we receive a EOF or an error, or 'exec' is dead, then we know
- // that we will not be producing more results. We indicate that the cursor is closed by
- // sending a cursorId of 0 back to the client.
- //
- // On the other hand, if we retrieve all results necessary for this batch, then
- // 'saveClientCursor' is true and we send a valid cursorId back to the client. In
- // this case, there may or may not actually be more results (for example, the next call
- // to getNext(...) might just return EOF).
- bool saveClientCursor = false;
-
if (PlanExecutor::DEAD == state || PlanExecutor::FAILURE == state) {
// Propagate this error to caller.
if (PlanExecutor::FAILURE == state) {
@@ -404,9 +423,6 @@ namespace mongo {
WorkingSetCommon::toStatusString(obj));
}
- // If we're dead there's no way to get more results.
- saveClientCursor = false;
-
// In the old system tailable capped cursors would be killed off at the
// cursorid level. If a tailable capped cursor is nuked the cursorid
// would vanish.
@@ -417,14 +433,6 @@ namespace mongo {
resultFlags = ResultFlag_CursorNotFound;
}
}
- else if (PlanExecutor::IS_EOF == state) {
- // EOF is also end of the line unless it's tailable.
- saveClientCursor = queryOptions & QueryOption_CursorTailable;
- }
- else {
- verify(PlanExecutor::ADVANCED == state);
- saveClientCursor = true;
- }
// If we are operating on an aggregation cursor, then we dropped our collection lock
// earlier and need to reacquire it in order to clean up our ClientCursorPin.
@@ -443,7 +451,7 @@ namespace mongo {
// this case, the pin's destructor will be invoked, which will call release() on the
// pin. Because our ClientCursorPin is declared after our lock is declared, this
// will happen under the lock.
- if (!saveClientCursor) {
+ if (!shouldSaveCursorGetMore(state, exec, isCursorTailable(cc))) {
ruSwapper.reset();
ccPin.deleteUnderlying();
// cc is now invalid, as is the executor
@@ -464,8 +472,7 @@ namespace mongo {
if (PlanExecutor::IS_EOF == state && (queryOptions & QueryOption_CursorTailable)) {
if (!txn->getClient()->isInDirectClient()) {
// Don't stash the RU. Get a new one on the next getMore.
- ruSwapper.reset();
- delete cc->releaseOwnedRecoveryUnit();
+ ruSwapper->dismiss();
}
if ((queryOptions & QueryOption_AwaitData)
diff --git a/src/mongo/db/query/find.h b/src/mongo/db/query/find.h
index 3bc177f011f..f62b89f5c0a 100644
--- a/src/mongo/db/query/find.h
+++ b/src/mongo/db/query/find.h
@@ -40,6 +40,26 @@ namespace mongo {
class OperationContext;
+ class ScopedRecoveryUnitSwapper {
+ public:
+ ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn);
+
+ ~ScopedRecoveryUnitSwapper();
+
+ /**
+ * Dismissing the RU swapper causes it to simply free the recovery unit rather than swapping
+ * it back into the ClientCursor.
+ */
+ void dismiss();
+
+ private:
+ ClientCursor* _cc;
+ OperationContext* _txn;
+ bool _dismissed;
+
+ std::unique_ptr<RecoveryUnit> _txnPreviousRecoveryUnit;
+ };
+
/**
* Returns true if enough results have been prepared to stop adding more to the first batch.
*
@@ -48,6 +68,18 @@ namespace mongo {
bool enoughForFirstBatch(const LiteParsedQuery& pq, int numDocs, int bytesBuffered);
/**
+ * Returns true if enough results have been prepared to stop adding more to a getMore batch.
+ *
+ * Should be called *after* adding to the result set rather than before.
+ */
+ bool enoughForGetMore(int ntoreturn, int numDocs, int bytesBuffered);
+
+ /**
+ * Whether or not the ClientCursor* is tailable.
+ */
+ bool isCursorTailable(const ClientCursor* cursor);
+
+ /**
* Returns true if we should keep a cursor around because we're expecting to return more query
* results.
*
@@ -60,6 +92,17 @@ namespace mongo {
PlanExecutor* exec);
/**
+ * Similar to shouldSaveCursor(), but used in getMore to determine whether we should keep
+ * the cursor around for additional getMores().
+ *
+ * If false, the caller should close the cursor and indicate this to the client by sending back
+ * a cursor ID of 0.
+ */
+ bool shouldSaveCursorGetMore(PlanExecutor::ExecState finalState,
+ PlanExecutor* exec,
+ bool isTailable);
+
+ /**
* Fills out CurOp with information about this query.
*/
void beginQueryOp(const NamespaceString& nss,
diff --git a/src/mongo/db/query/getmore_request.cpp b/src/mongo/db/query/getmore_request.cpp
new file mode 100644
index 00000000000..e1e6d4d3d24
--- /dev/null
+++ b/src/mongo/db/query/getmore_request.cpp
@@ -0,0 +1,117 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/query/getmore_request.h"
+
+namespace mongo {
+
+ const int GetMoreRequest::kDefaultBatchSize = 101;
+
+ GetMoreRequest::GetMoreRequest()
+ : cursorid(0),
+ batchSize(0) { }
+
+ GetMoreRequest::GetMoreRequest(const std::string& fullns, CursorId id, int sizeOfBatch)
+ : nss(fullns),
+ cursorid(id),
+ batchSize(sizeOfBatch) { }
+
+ Status GetMoreRequest::isValid() const {
+ if (!nss.isValid()) {
+ return Status(ErrorCodes::BadValue, str::stream()
+ << "Invalid namespace for getMore: " << nss.ns());
+ }
+
+ if (cursorid == 0) {
+ return Status(ErrorCodes::BadValue, "Cursor id for getMore must be non-zero");
+ }
+
+ if (batchSize < 0) {
+ return Status(ErrorCodes::BadValue, str::stream()
+ << "Batch size for getMore must be non-negative, "
+ << "but received: " << batchSize);
+ }
+
+ return Status::OK();
+ }
+
+ // static
+ std::string GetMoreRequest::parseNs(const std::string& dbname, const BSONObj& cmdObj) {
+ BSONElement collElt = cmdObj["collection"];
+ const std::string coll = (collElt.type() == BSONType::String) ? collElt.String()
+ : "";
+
+ return str::stream() << dbname << "." << coll;
+ }
+
+ // static
+ StatusWith<GetMoreRequest> GetMoreRequest::parseFromBSON(const std::string& dbname,
+ const BSONObj& cmdObj) {
+ if (!str::equals(cmdObj.firstElementFieldName(), "getMore")) {
+ return StatusWith<GetMoreRequest>(ErrorCodes::FailedToParse, str::stream()
+ << "First field name must be 'getMore' in: " << cmdObj);
+ }
+
+ BSONElement cursorIdElt = cmdObj.firstElement();
+ if (cursorIdElt.type() != BSONType::NumberLong) {
+ return StatusWith<GetMoreRequest>(ErrorCodes::TypeMismatch, str::stream()
+ << "Field 'getMore' must be of type long in: " << cmdObj);
+ }
+ const CursorId cursorid = cursorIdElt.Long();
+
+ BSONElement collElt = cmdObj["collection"];
+ if (collElt.type() != BSONType::String) {
+ return StatusWith<GetMoreRequest>(ErrorCodes::TypeMismatch, str::stream()
+ << "Field 'collection' must be of type string in: " << cmdObj);
+ }
+ const std::string fullns = parseNs(dbname, cmdObj);
+
+ int batchSize = kDefaultBatchSize;
+ BSONElement batchSizeElt = cmdObj["batchSize"];
+ if (batchSizeElt.type() != BSONType::NumberInt && !batchSizeElt.eoo()) {
+ return StatusWith<GetMoreRequest>(ErrorCodes::TypeMismatch, str::stream()
+ << "Field 'batchSize' must be of type int in: " << cmdObj);
+ }
+ else if (!batchSizeElt.eoo()) {
+ batchSize = batchSizeElt.Int();
+ }
+
+ GetMoreRequest request(fullns, cursorid, batchSize);
+ Status validStatus = request.isValid();
+ if (!validStatus.isOK()) {
+ return StatusWith<GetMoreRequest>(validStatus);
+ }
+
+ return StatusWith<GetMoreRequest>(request);
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/query/getmore_request.h b/src/mongo/db/query/getmore_request.h
new file mode 100644
index 00000000000..e7b340dc91a
--- /dev/null
+++ b/src/mongo/db/query/getmore_request.h
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/db/clientcursor.h"
+#include "mongo/db/namespace_string.h"
+
+namespace mongo {
+
+ struct GetMoreRequest {
+ /**
+ * Construct an empty request.
+ */
+ GetMoreRequest();
+
+ /**
+ * Construct a GetMoreRequesst from the command specification and db name.
+ */
+ static StatusWith<GetMoreRequest> parseFromBSON(const std::string& dbname,
+ const BSONObj& cmdObj);
+
+ static std::string parseNs(const std::string& dbname, const BSONObj& cmdObj);
+
+ const NamespaceString nss;
+ const CursorId cursorid;
+ const int batchSize;
+
+ static const int kDefaultBatchSize;
+
+ private:
+ /**
+ * Construct from parsed BSON
+ */
+ GetMoreRequest(const std::string& fullns, CursorId id, int batch);
+
+ /**
+ * Returns a non-OK status if there are semantic errors in the parsed request
+ * (e.g. a negative batchSize).
+ */
+ Status isValid() const;
+ };
+
+} // namespace mongo