/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
#include "mongo/platform/basic.h"
#include
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/cursor_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/cursor_responses.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/global_timestamp.h"
#include "mongo/db/query/find.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
/**
* A command for running getMore() against an existing cursor registered with a CursorManager.
* Used to generate the next batch of results for a ClientCursor.
*
* Can be used in combination with any cursor-generating command (e.g. find, aggregate,
* listIndexes).
*/
class GetMoreCmd : public Command {
MONGO_DISALLOW_COPYING(GetMoreCmd);
public:
GetMoreCmd() : Command("getMore") { }
bool isWriteCommandForConfigServer() const override { return false; }
bool slaveOk() const override { return false; }
bool slaveOverrideOk() const override { return true; }
bool maintenanceOk() const override { return false; }
bool adminOnly() const override { return false; }
void help(std::stringstream& help) const override {
help << "retrieve more results from an existing cursor";
}
/**
* A getMore command increments the getMore counter, not the command counter.
*/
bool shouldAffectCommandCounter() const override { return false; }
std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
return GetMoreRequest::parseNs(dbname, cmdObj);
}
Status checkAuthForCommand(ClientBasic* client,
const std::string& dbname,
const BSONObj& cmdObj) override {
StatusWith parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj);
if (!parseStatus.isOK()) {
return parseStatus.getStatus();
}
const GetMoreRequest& request = parseStatus.getValue();
return AuthorizationSession::get(client)->checkAuthForGetMore(request.nss,
request.cursorid);
}
bool run(OperationContext* txn,
const std::string& dbname,
BSONObj& cmdObj,
int options,
std::string& errmsg,
BSONObjBuilder& result) override {
// Counted as a getMore, not as a command.
globalOpCounters.gotGetMore();
if (txn->getClient()->isInDirectClient()) {
return appendCommandStatus(result,
Status(ErrorCodes::IllegalOperation,
"Cannot run getMore command from eval()"));
}
StatusWith parseStatus = GetMoreRequest::parseFromBSON(dbname, cmdObj);
if (!parseStatus.isOK()) {
return appendCommandStatus(result, parseStatus.getStatus());
}
const GetMoreRequest& request = parseStatus.getValue();
// Depending on the type of cursor being operated on, we hold locks for the whole
// getMore, or none of the getMore, or part of the getMore. The three cases in detail:
//
// 1) Normal cursor: we lock with "ctx" and hold it for the whole getMore.
// 2) Cursor owned by global cursor manager: we don't lock anything. These cursors
// don't own any collection state.
// 3) Agg cursor: we lock with "ctx", then release, then relock with "unpinDBLock" and
// "unpinCollLock". This is because agg cursors handle locking internally (hence the
// release), but the pin and unpin of the cursor must occur under the collection
// lock. We don't use our AutoGetCollectionForRead "ctx" to relock, because
// AutoGetCollectionForRead checks the sharding version (and we want the relock for
// the unpin to succeed even if the sharding version has changed).
//
// Note that we declare our locks before our ClientCursorPin, in order to ensure that
// the pin's destructor is called before the lock destructors (so that the unpin occurs
// under the lock).
std::unique_ptr ctx;
std::unique_ptr unpinDBLock;
std::unique_ptr unpinCollLock;
CursorManager* cursorManager;
CursorManager* globalCursorManager = CursorManager::getGlobalCursorManager();
if (globalCursorManager->ownsCursorId(request.cursorid)) {
cursorManager = globalCursorManager;
}
else {
ctx.reset(new AutoGetCollectionForRead(txn, request.nss));
Collection* collection = ctx->getCollection();
if (!collection) {
return appendCommandStatus(result,
Status(ErrorCodes::OperationFailed,
"collection dropped between getMore calls"));
}
cursorManager = collection->getCursorManager();
}
ClientCursorPin ccPin(cursorManager, request.cursorid);
ClientCursor* cursor = ccPin.c();
if (!cursor) {
// We didn't find the cursor.
return appendCommandStatus(result, Status(ErrorCodes::CursorNotFound, str::stream()
<< "Cursor not found, cursor id: " << request.cursorid));
}
if (request.nss.ns() != cursor->ns()) {
return appendCommandStatus(result, Status(ErrorCodes::Unauthorized, str::stream()
<< "Requested getMore on namespace '" << request.nss.ns()
<< "', but cursor belongs to a different namespace"));
}
const bool hasOwnMaxTime = CurOp::get(txn)->isMaxTimeSet();
// Validation related to awaitData.
if (isCursorAwaitData(cursor)) {
invariant(isCursorTailable(cursor));
if (!hasOwnMaxTime) {
Status status(ErrorCodes::BadValue,
str::stream() << "Must set maxTimeMS on a getMore if the initial "
<< "query had 'awaitData' set: " << cmdObj);
return appendCommandStatus(result, status);
}
if (cursor->isAggCursor()) {
Status status(ErrorCodes::BadValue,
"awaitData cannot be set on an aggregation cursor");
return appendCommandStatus(result, status);
}
}
// On early return, get rid of the cursor.
ScopeGuard cursorFreer = MakeGuard(&GetMoreCmd::cleanupCursor, txn, &ccPin, request);
if (!cursor->hasRecoveryUnit()) {
// Start using a new RecoveryUnit.
cursor->setOwnedRecoveryUnit(
getGlobalServiceContext()->getGlobalStorageEngine()->newRecoveryUnit());
}
// Swap RecoveryUnit(s) between the ClientCursor and OperationContext.
ScopedRecoveryUnitSwapper ruSwapper(cursor, txn);
// Reset timeout timer on the cursor since the cursor is still in use.
cursor->setIdleTime(0);
// If there is no time limit set directly on this getMore command, but the operation
// that spawned this cursor had a time limit set, then we have to apply any leftover
// time to this getMore.
if (!hasOwnMaxTime) {
CurOp::get(txn)->setMaxTimeMicros(cursor->getLeftoverMaxTimeMicros());
}
txn->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
if (cursor->isAggCursor()) {
// Agg cursors handle their own locking internally.
ctx.reset(); // unlocks
}
PlanExecutor* exec = cursor->getExecutor();
exec->restoreState(txn);
// If we're tailing a capped collection, retrieve a monotonically increasing insert
// counter.
uint64_t lastInsertCount = 0;
if (isCursorAwaitData(cursor)) {
invariant(ctx->getCollection()->isCapped());
lastInsertCount = ctx->getCollection()->getCappedInsertNotifier()->getCount();
}
CursorId respondWithId = 0;
BSONArrayBuilder nextBatch;
BSONObj obj;
PlanExecutor::ExecState state;
int numResults = 0;
Status batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults);
if (!batchStatus.isOK()) {
return appendCommandStatus(result, batchStatus);
}
// If this is an await data cursor, and we hit EOF without generating any results, then
// we block waiting for new oplog data to arrive.
if (isCursorAwaitData(cursor) && state == PlanExecutor::IS_EOF && numResults == 0) {
// Retrieve the notifier which we will wait on until new data arrives. We make sure
// to do this in the lock because once we drop the lock it is possible for the
// collection to become invalid. The notifier itself will outlive the collection if
// the collection is dropped, as we keep a shared_ptr to it.
auto notifier = ctx->getCollection()->getCappedInsertNotifier();
// Save the PlanExecutor and drop our locks.
exec->saveState();
ctx.reset();
// Block waiting for data.
Microseconds timeout(CurOp::get(txn)->getRemainingMaxTimeMicros());
notifier->waitForInsert(lastInsertCount, timeout);
notifier.reset();
ctx.reset(new AutoGetCollectionForRead(txn, request.nss));
exec->restoreState(txn);
// We woke up because either the timed_wait expired, or there was more data. Either
// way, attempt to generate another batch of results.
batchStatus = generateBatch(cursor, request, &nextBatch, &state, &numResults);
if (!batchStatus.isOK()) {
return appendCommandStatus(result, batchStatus);
}
}
if (shouldSaveCursorGetMore(state, exec, isCursorTailable(cursor))) {
respondWithId = request.cursorid;
exec->saveState();
// If maxTimeMS was set directly on the getMore rather than being rolled over
// from a previous find, then don't roll remaining micros over to the next
// getMore.
if (!hasOwnMaxTime) {
cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros());
}
cursor->incPos(numResults);
if (isCursorTailable(cursor) && state == PlanExecutor::IS_EOF) {
// Rather than swapping their existing RU into the client cursor, tailable
// cursors should get a new recovery unit.
ruSwapper.dismiss();
}
}
else {
CurOp::get(txn)->debug().cursorExhausted = true;
}
appendGetMoreResponseObject(respondWithId, request.nss.ns(), nextBatch.arr(), &result);
if (respondWithId) {
cursorFreer.Dismiss();
// If we are operating on an aggregation cursor, then we dropped our collection lock
// earlier and need to reacquire it in order to clean up our ClientCursorPin.
if (cursor->isAggCursor()) {
invariant(NULL == ctx.get());
unpinDBLock.reset(
new Lock::DBLock(txn->lockState(), request.nss.db(), MODE_IS));
unpinCollLock.reset(
new Lock::CollectionLock(txn->lockState(), request.nss.ns(), MODE_IS));
}
}
return true;
}
/**
* Uses 'cursor' and 'request' to fill out 'nextBatch' with the batch of result documents to
* be returned by this getMore.
*
* Returns the number of documents in the batch in *numResults, which must be initialized to
* zero by the caller. Returns the final ExecState returned by the cursor in *state.
*
* Returns an OK status if the batch was successfully generated, and a non-OK status if the
* PlanExecutor encounters a failure.
*/
Status generateBatch(ClientCursor* cursor,
const GetMoreRequest& request,
BSONArrayBuilder* nextBatch,
PlanExecutor::ExecState* state,
int* numResults) {
PlanExecutor* exec = cursor->getExecutor();
const bool isAwaitData = isCursorAwaitData(cursor);
// If an awaitData getMore is killed during this process due to our max time expiring at
// an interrupt point, we just continue as normal and return rather than reporting a
// timeout to the user.
BSONObj obj;
try {
while (PlanExecutor::ADVANCED == (*state = exec->getNext(&obj, NULL))) {
// If adding this object will cause us to exceed the BSON size limit, then we
// stash it for later.
if (nextBatch->len() + obj.objsize() > BSONObjMaxUserSize && *numResults > 0) {
exec->enqueue(obj);
break;
}
// Add result to output buffer.
nextBatch->append(obj);
(*numResults)++;
if (enoughForGetMore(request.batchSize.value_or(0),
*numResults, nextBatch->len())) {
break;
}
}
}
catch (const UserException& except) {
if (isAwaitData && except.getCode() == ErrorCodes::ExceededTimeLimit) {
// We ignore exceptions from interrupt points due to max time expiry for
// awaitData cursors.
}
else {
throw;
}
}
if (PlanExecutor::FAILURE == *state) {
const std::unique_ptr stats(exec->getStats());
error() << "GetMore executor error, stats: " << Explain::statsToBSON(*stats);
return Status(ErrorCodes::OperationFailed,
str::stream() << "GetMore executor error: "
<< WorkingSetCommon::toStatusString(obj));
}
else if (PlanExecutor::DEAD == *state) {
return Status(ErrorCodes::OperationFailed,
str::stream() << "Plan executor killed during getMore command, "
<< "ns: " << request.nss.ns());
}
return Status::OK();
}
/**
* Called via a ScopeGuard on early return in order to ensure that the ClientCursor gets
* cleaned up properly.
*/
static void cleanupCursor(OperationContext* txn,
ClientCursorPin* ccPin,
const GetMoreRequest& request) {
ClientCursor* cursor = ccPin->c();
std::unique_ptr unpinDBLock;
std::unique_ptr unpinCollLock;
if (cursor->isAggCursor()) {
unpinDBLock.reset(new Lock::DBLock(txn->lockState(), request.nss.db(), MODE_IS));
unpinCollLock.reset(
new Lock::CollectionLock(txn->lockState(), request.nss.ns(), MODE_IS));
}
ccPin->deleteUnderlying();
}
} getMoreCmd;
} // namespace mongo