diff options
-rw-r--r-- | src/mongo/db/clientcursor.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/clientcursor.h | 44 | ||||
-rw-r--r-- | src/mongo/db/commands/pipeline_command.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/dbdirectclient.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/instance.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/instance.h | 3 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 16 | ||||
-rw-r--r-- | src/mongo/db/operation_context_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/operation_context_impl.h | 8 | ||||
-rw-r--r-- | src/mongo/db/operation_context_noop.h | 10 | ||||
-rw-r--r-- | src/mongo/db/query/new_find.cpp | 61 | ||||
-rw-r--r-- | src/mongo/db/query/new_find.h | 6 | ||||
-rw-r--r-- | src/mongo/dbtests/querytests.cpp | 2 |
13 files changed, 210 insertions, 21 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index ca661a0bfdf..17315bf566c 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -72,7 +72,9 @@ namespace mongo { ClientCursor::ClientCursor(const Collection* collection, PlanExecutor* exec, int qopts, const BSONObj query) : _collection( collection ), - _countedYet( false ) { + _countedYet( false ), + _unownedRU(NULL) { + _exec.reset(exec); _ns = exec->ns(); _query = query; @@ -87,7 +89,8 @@ namespace mongo { : _ns(collection->ns().ns()), _collection(collection), _countedYet( false ), - _queryOptions(QueryOption_NoCursorTimeout) { + _queryOptions(QueryOption_NoCursorTimeout), + _unownedRU(NULL) { init(); } @@ -176,6 +179,30 @@ namespace mongo { } // + // Storage engine state for getMore. + // + + void ClientCursor::setUnownedRecoveryUnit(RecoveryUnit* ru) { + invariant(!_unownedRU); + invariant(!_ownedRU.get()); + _unownedRU = ru; + } + + RecoveryUnit* ClientCursor::getUnownedRecoveryUnit() const { + return _unownedRU; + } + + void ClientCursor::setOwnedRecoveryUnit(RecoveryUnit* ru) { + invariant(!_unownedRU); + invariant(!_ownedRU.get()); + _ownedRU.reset(ru); + } + + RecoveryUnit* ClientCursor::releaseOwnedRecoveryUnit() { + return _ownedRU.release(); + } + + // // Pin methods // TODO: Simplify when we kill Cursor. In particular, once we've pinned a CC, it won't be // deleted from underneath us, so we can save the pointer and ignore the ID. diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 6eaafd7d8d1..0ebdfeff161 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -47,6 +47,7 @@ namespace mongo { class Database; class NamespaceDetails; class ParsedQuery; + class RecoveryUnit; typedef long long CursorId; /* passed to the client so it can send back on getMore */ static const CursorId INVALID_CURSOR_ID = -1; // But see SERVER-5726. @@ -148,6 +149,45 @@ namespace mongo { static long long totalOpen(); + // + // Storage engine state for getMore. + // + + /** + * + * If a ClientCursor is created via DBDirectClient, it uses the same storage engine + * context as the DBDirectClient caller. We store this context in _unownedRU. We use + * this to verify that all further callers use the same RecoveryUnit. + * + * Once a ClientCursor has an unowned RecoveryUnit, it will always have one. + * + * Sets the unowned RecoveryUnit to 'ru'. Does NOT take ownership of the pointer. + */ + void setUnownedRecoveryUnit(RecoveryUnit* ru); + + /** + * Return the unowned RecoveryUnit. 'this' does not own pointer and therefore cannot + * transfer ownership. + */ + RecoveryUnit* getUnownedRecoveryUnit() const; + + /** + * If a ClientCursor is created via a client request, we bind its lifetime to the + * ClientCursor's by storing it un _ownedRU. In order to execute the query over repeated + * network requests, we have to keep the execution state around. + */ + + /** + * Set the owned recovery unit to 'ru'. Takes ownership of it. If there is a previous + * owned recovery unit, it is deleted. + */ + void setOwnedRecoveryUnit(RecoveryUnit* ru); + + /** + * Returns the owned recovery unit. Ownership is transferred to the caller. + */ + RecoveryUnit* releaseOwnedRecoveryUnit(); + private: friend class ClientCursorMonitor; friend class CmdCursorInfo; @@ -209,6 +249,10 @@ namespace mongo { // The underlying execution machinery. // scoped_ptr<PlanExecutor> _exec; + + // Only one of these is not-NULL. + RecoveryUnit* _unownedRU; + std::auto_ptr<RecoveryUnit> _ownedRU; }; /** diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 12f051e1b49..0e6d70768c6 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -39,6 +39,7 @@ #include "mongo/db/curop.h" #include "mongo/db/commands.h" #include "mongo/db/exec/pipeline_proxy.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/pipeline/accumulator.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" @@ -142,6 +143,12 @@ namespace mongo { // If a time limit was set on the pipeline, remaining time is "rolled over" to the // cursor (for use by future getmore ops). cursor->setLeftoverMaxTimeMicros( txn->getCurOp()->getRemainingMaxTimeMicros() ); + + // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent + // getMore requests. The calling OpCtx gets a fresh RecoveryUnit. + cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit()); + StorageEngine* storageEngine = getGlobalEnvironment()->getGlobalStorageEngine(); + txn->setRecoveryUnit(storageEngine->newRecoveryUnit(txn)); } BSONObjBuilder cursorObj(result.subobjStart("cursor")); @@ -250,6 +257,7 @@ namespace mongo { } if (collection) { + // XXX ClientCursor* cursor = new ClientCursor(collection, execHolder.release()); cursor->isAggCursor = true; // enable special locking behavior pin.reset(new ClientCursorPin(collection, cursor->cursorid())); diff --git a/src/mongo/db/dbdirectclient.cpp b/src/mongo/db/dbdirectclient.cpp index 7d43034ad58..8be0a50519d 100644 --- a/src/mongo/db/dbdirectclient.cpp +++ b/src/mongo/db/dbdirectclient.cpp @@ -121,7 +121,7 @@ namespace mongo { } DbResponse dbResponse; - assembleResponse(_txn, toSend, dbResponse, dummyHost); + assembleResponse(_txn, toSend, dbResponse, dummyHost, true); verify(dbResponse.response); // can get rid of this if we make response handling smarter @@ -138,7 +138,7 @@ namespace mongo { } DbResponse dbResponse; - assembleResponse(_txn, toSend, dbResponse, dummyHost); + assembleResponse(_txn, toSend, dbResponse, dummyHost, true); } auto_ptr<DBClientCursor> DBDirectClient::query(const string& ns, diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index e463a9cfdf2..77516e8f617 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -107,7 +107,11 @@ namespace mongo { void receivedUpdate(OperationContext* txn, Message& m, CurOp& op); void receivedDelete(OperationContext* txn, Message& m, CurOp& op); void receivedInsert(OperationContext* txn, Message& m, CurOp& op); - bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop ); + bool receivedGetMore(OperationContext* txn, + DbResponse& dbresponse, + Message& m, + CurOp& curop, + bool fromDBDirectClient); int nloggedsome = 0; #define LOGWITHRATELIMIT if( ++nloggedsome < 1000 || nloggedsome % 100 == 0 ) @@ -239,7 +243,11 @@ namespace mongo { replyToQuery(0, m, dbresponse, obj); } - static bool receivedQuery(OperationContext* txn, Client& c, DbResponse& dbresponse, Message& m ) { + static bool receivedQuery(OperationContext* txn, + Client& c, + DbResponse& dbresponse, + Message& m, + bool fromDBDirectClient) { bool ok = true; MSGID responseTo = m.header().getId(); @@ -260,7 +268,7 @@ namespace mongo { audit::logQueryAuthzCheck(client, ns, q.query, status.code()); uassertStatusOK(status); } - dbresponse.exhaustNS = newRunQuery(txn, m, q, op, *resp); + dbresponse.exhaustNS = newRunQuery(txn, m, q, op, *resp, fromDBDirectClient); verify( !resp->empty() ); } catch ( SendStaleConfigException& e ){ @@ -339,7 +347,8 @@ namespace mongo { void assembleResponse( OperationContext* txn, Message& m, DbResponse& dbresponse, - const HostAndPort& remote ) { + const HostAndPort& remote, + bool fromDBDirectClient ) { // before we lock... int op = m.operation(); bool isCommand = false; @@ -434,10 +443,10 @@ namespace mongo { if ( op == dbQuery ) { if (!checkShardVersion(m, &dbresponse)) return; - receivedQuery(txn, c , dbresponse, m ); + receivedQuery(txn, c , dbresponse, m, fromDBDirectClient ); } else if ( op == dbGetMore ) { - if ( ! receivedGetMore(txn, dbresponse, m, currentOp) ) + if ( ! receivedGetMore(txn, dbresponse, m, currentOp, fromDBDirectClient) ) shouldLog = true; } else if ( op == dbMsg ) { @@ -658,7 +667,11 @@ namespace mongo { QueryResult::View emptyMoreResult(long long); - bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop ) { + bool receivedGetMore(OperationContext* txn, + DbResponse& dbresponse, + Message& m, + CurOp& curop, + bool fromDBDirectClient) { bool ok = true; DbMessage d(m); @@ -708,7 +721,8 @@ namespace mongo { curop, pass, exhaust, - &isCursorAuthorized); + &isCursorAuthorized, + fromDBDirectClient); } catch ( AssertionException& e ) { if ( isCursorAuthorized ) { diff --git a/src/mongo/db/instance.h b/src/mongo/db/instance.h index 3b0453fddec..8b14a0e37e3 100644 --- a/src/mongo/db/instance.h +++ b/src/mongo/db/instance.h @@ -71,7 +71,8 @@ namespace mongo { void assembleResponse( OperationContext* txn, Message& m, DbResponse& dbresponse, - const HostAndPort &client ); + const HostAndPort &client, + bool fromDBDirectClient = false ); void maybeCreatePidFile(); diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 5883b601d2a..e4efd8bc6b3 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -62,6 +62,22 @@ namespace mongo { virtual RecoveryUnit* recoveryUnit() const = 0; /** + * Returns the RecoveryUnit (same return value as recoveryUnit()) but the caller takes + * ownership of the returned RecoveryUnit, and the OperationContext instance relinquishes + * ownership. Sets the RecoveryUnit to NULL. + * + * Used to transfer ownership of storage engine state from OperationContext + * to ClientCursor for getMore-able queries. + * + * Note that we don't allow the top-level locks to be stored across getMore. + * We rely on active cursors being killed when collections or databases are dropped, + * or when collection metadata changes. + */ + virtual RecoveryUnit* releaseRecoveryUnit() = 0; + + virtual void setRecoveryUnit(RecoveryUnit* unit) = 0; + + /** * Interface for locking. Caller DOES NOT own pointer. */ virtual Locker* lockState() const = 0; diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp index 1f482f23127..596d9d4a1a7 100644 --- a/src/mongo/db/operation_context_impl.cpp +++ b/src/mongo/db/operation_context_impl.cpp @@ -61,6 +61,14 @@ namespace mongo { return _recovery.get(); } + RecoveryUnit* OperationContextImpl::releaseRecoveryUnit() { + return _recovery.release(); + } + + void OperationContextImpl::setRecoveryUnit(RecoveryUnit* unit) { + _recovery.reset(unit); + } + Locker* OperationContextImpl::lockState() const { return _locker.get(); } diff --git a/src/mongo/db/operation_context_impl.h b/src/mongo/db/operation_context_impl.h index c178936271f..e9c43c08928 100644 --- a/src/mongo/db/operation_context_impl.h +++ b/src/mongo/db/operation_context_impl.h @@ -43,6 +43,10 @@ namespace mongo { virtual RecoveryUnit* recoveryUnit() const; + virtual RecoveryUnit* releaseRecoveryUnit(); + + virtual void setRecoveryUnit(RecoveryUnit* unit); + virtual Locker* lockState() const; virtual ProgressMeter* setMessage(const char* msg, @@ -69,8 +73,10 @@ namespace mongo { virtual Transaction* getTransaction(); private: - boost::scoped_ptr<RecoveryUnit> _recovery; + std::auto_ptr<RecoveryUnit> _recovery; + Transaction _tx; + boost::scoped_ptr<Locker> _locker; }; diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h index 7aca695854d..ea991a855ad 100644 --- a/src/mongo/db/operation_context_noop.h +++ b/src/mongo/db/operation_context_noop.h @@ -61,6 +61,14 @@ namespace mongo { return _recoveryUnit.get(); } + virtual RecoveryUnit* releaseRecoveryUnit() { + return _recoveryUnit.release(); + } + + virtual void setRecoveryUnit(RecoveryUnit* unit) { + _recoveryUnit.reset(unit); + } + virtual Locker* lockState() const { // TODO: This should return an actual object if necessary for testing. return NULL; @@ -101,7 +109,7 @@ namespace mongo { } private: - boost::scoped_ptr<RecoveryUnit> _recoveryUnit; + std::auto_ptr<RecoveryUnit> _recoveryUnit; }; } // namespace mongo diff --git a/src/mongo/db/query/new_find.cpp b/src/mongo/db/query/new_find.cpp index e76a65cf55a..7b248e2428b 100644 --- a/src/mongo/db/query/new_find.cpp +++ b/src/mongo/db/query/new_find.cpp @@ -38,6 +38,7 @@ #include "mongo/db/exec/filter.h" #include "mongo/db/exec/oplogstart.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/global_environment_experiment.h" #include "mongo/db/keypattern.h" #include "mongo/db/query/explain.h" #include "mongo/db/query/find_constants.h" @@ -136,8 +137,30 @@ namespace mongo { return true; } + struct ScopedRecoveryUnitSwapper { + explicit ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn) + : _cc(cc), _txn(txn) { + + // Save this for later. We restore it upon destruction. + _txnPreviousRecoveryUnit = txn->releaseRecoveryUnit(); + + // Transfer ownership of the RecoveryUnit from the ClientCursor to the OpCtx. + RecoveryUnit* ccRecoveryUnit = cc->releaseOwnedRecoveryUnit(); + txn->setRecoveryUnit(ccRecoveryUnit); + } + + ~ScopedRecoveryUnitSwapper() { + _cc->setOwnedRecoveryUnit(_txn->releaseRecoveryUnit()); + _txn->setRecoveryUnit(_txnPreviousRecoveryUnit); + } + + ClientCursor* _cc; + OperationContext* _txn; + RecoveryUnit* _txnPreviousRecoveryUnit; + }; + /** - * Also called by db/ops/query.cpp. This is the new getMore entry point. + * Called by db/instance.cpp. This is the getMore entry point. * * pass - when QueryOption_AwaitData is in use, the caller will make repeated calls * when this method returns an empty result, incrementing pass on each call. @@ -150,7 +173,8 @@ namespace mongo { CurOp& curop, int pass, bool& exhaust, - bool* isCursorAuthorized) { + bool* isCursorAuthorized, + bool fromDBDirectClient) { // For testing, we may want to fail if we receive a getmore. if (MONGO_FAIL_POINT(failReceivedGetmore)) { @@ -182,6 +206,15 @@ namespace mongo { ClientCursorPin ccPin(collection, cursorid); ClientCursor* cc = ccPin.c(); + // If we're not being called from DBDirectClient we want to associate the RecoveryUnit + // used to create the execution machinery inside the cursor with our OperationContext. + // If we throw or otherwise exit this method in a disorderly fashion, we must ensure + // that further calls to getMore won't fail, and that the provided OperationContext + // has a valid RecoveryUnit. As such, we use RAII to accomplish this. + // + // This must be destroyed before the ClientCursor is destroyed. + std::auto_ptr<ScopedRecoveryUnitSwapper> ruSwapper; + // These are set in the QueryResult msg we return. int resultFlags = ResultFlag_AwaitCapable; @@ -204,6 +237,15 @@ namespace mongo { uassert(17011, "auth error", str::equals(ns, cc->ns().c_str())); *isCursorAuthorized = true; + // Restore the RecoveryUnit if we need to. + if (fromDBDirectClient) { + invariant(txn->recoveryUnit() == cc->getUnownedRecoveryUnit()); + } + else { + // Swap RecoveryUnit(s) between the ClientCursor and OperationContext. + ruSwapper.reset(new ScopedRecoveryUnitSwapper(cc, txn)); + } + // Reset timeout timer on the cursor since the cursor is still in use. cc->setIdleTime(0); @@ -313,6 +355,7 @@ namespace mongo { } if (!saveClientCursor) { + ruSwapper.reset(); ccPin.deleteUnderlying(); // cc is now invalid, as is the executor cursorid = 0; @@ -434,7 +477,8 @@ namespace mongo { Message& m, QueryMessage& q, CurOp& curop, - Message &result) { + Message &result, + bool fromDBDirectClient) { // Validate the namespace. const char *ns = q.ns; uassert(16332, "can't have an empty ns", ns[0]); @@ -755,6 +799,17 @@ namespace mongo { cq->getParsed().getFilter()); ccId = cc->cursorid(); + if (fromDBDirectClient) { + cc->setUnownedRecoveryUnit(txn->recoveryUnit()); + } + else { + // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent + // getMore requests. The calling OpCtx gets a fresh RecoveryUnit. + cc->setOwnedRecoveryUnit(txn->releaseRecoveryUnit()); + StorageEngine* storageEngine = getGlobalEnvironment()->getGlobalStorageEngine(); + txn->setRecoveryUnit(storageEngine->newRecoveryUnit(txn)); + } + QLOG() << "caching executor with cursorid " << ccId << " after returning " << numResults << " results" << endl; diff --git a/src/mongo/db/query/new_find.h b/src/mongo/db/query/new_find.h index c45f06a3f0c..6a020e2e07d 100644 --- a/src/mongo/db/query/new_find.h +++ b/src/mongo/db/query/new_find.h @@ -65,7 +65,8 @@ namespace mongo { CurOp& curop, int pass, bool& exhaust, - bool* isCursorAuthorized); + bool* isCursorAuthorized, + bool fromDBDirectClient); /** * Run the query 'q' and place the result in 'result'. @@ -74,6 +75,7 @@ namespace mongo { Message& m, QueryMessage& q, CurOp& curop, - Message &result); + Message &result, + bool fromDBDirectClient); } // namespace mongo diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index 865e71ed153..aaeaf6c6817 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -1432,7 +1432,7 @@ namespace QueryTests { DbMessage dbMessage( message ); QueryMessage queryMessage( dbMessage ); Message result; - string exhaust = newRunQuery( &_txn, message, queryMessage, *cc().curop(), result ); + string exhaust = newRunQuery( &_txn, message, queryMessage, *cc().curop(), result, false ); ASSERT( exhaust.size() ); ASSERT_EQUALS( string( ns() ), exhaust ); } |