summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/clientcursor.cpp31
-rw-r--r--src/mongo/db/clientcursor.h44
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp8
-rw-r--r--src/mongo/db/dbdirectclient.cpp4
-rw-r--r--src/mongo/db/instance.cpp30
-rw-r--r--src/mongo/db/instance.h3
-rw-r--r--src/mongo/db/operation_context.h16
-rw-r--r--src/mongo/db/operation_context_impl.cpp8
-rw-r--r--src/mongo/db/operation_context_impl.h8
-rw-r--r--src/mongo/db/operation_context_noop.h10
-rw-r--r--src/mongo/db/query/new_find.cpp61
-rw-r--r--src/mongo/db/query/new_find.h6
-rw-r--r--src/mongo/dbtests/querytests.cpp2
13 files changed, 210 insertions, 21 deletions
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index ca661a0bfdf..17315bf566c 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -72,7 +72,9 @@ namespace mongo {
ClientCursor::ClientCursor(const Collection* collection, PlanExecutor* exec,
int qopts, const BSONObj query)
: _collection( collection ),
- _countedYet( false ) {
+ _countedYet( false ),
+ _unownedRU(NULL) {
+
_exec.reset(exec);
_ns = exec->ns();
_query = query;
@@ -87,7 +89,8 @@ namespace mongo {
: _ns(collection->ns().ns()),
_collection(collection),
_countedYet( false ),
- _queryOptions(QueryOption_NoCursorTimeout) {
+ _queryOptions(QueryOption_NoCursorTimeout),
+ _unownedRU(NULL) {
init();
}
@@ -176,6 +179,30 @@ namespace mongo {
}
//
+ // Storage engine state for getMore.
+ //
+
+ void ClientCursor::setUnownedRecoveryUnit(RecoveryUnit* ru) {
+ invariant(!_unownedRU);
+ invariant(!_ownedRU.get());
+ _unownedRU = ru;
+ }
+
+ RecoveryUnit* ClientCursor::getUnownedRecoveryUnit() const {
+ return _unownedRU;
+ }
+
+ void ClientCursor::setOwnedRecoveryUnit(RecoveryUnit* ru) {
+ invariant(!_unownedRU);
+ invariant(!_ownedRU.get());
+ _ownedRU.reset(ru);
+ }
+
+ RecoveryUnit* ClientCursor::releaseOwnedRecoveryUnit() {
+ return _ownedRU.release();
+ }
+
+ //
// Pin methods
// TODO: Simplify when we kill Cursor. In particular, once we've pinned a CC, it won't be
// deleted from underneath us, so we can save the pointer and ignore the ID.
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 6eaafd7d8d1..0ebdfeff161 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -47,6 +47,7 @@ namespace mongo {
class Database;
class NamespaceDetails;
class ParsedQuery;
+ class RecoveryUnit;
typedef long long CursorId; /* passed to the client so it can send back on getMore */
static const CursorId INVALID_CURSOR_ID = -1; // But see SERVER-5726.
@@ -148,6 +149,45 @@ namespace mongo {
static long long totalOpen();
+ //
+ // Storage engine state for getMore.
+ //
+
+ /**
+ *
+ * If a ClientCursor is created via DBDirectClient, it uses the same storage engine
+ * context as the DBDirectClient caller. We store this context in _unownedRU. We use
+ * this to verify that all further callers use the same RecoveryUnit.
+ *
+ * Once a ClientCursor has an unowned RecoveryUnit, it will always have one.
+ *
+ * Sets the unowned RecoveryUnit to 'ru'. Does NOT take ownership of the pointer.
+ */
+ void setUnownedRecoveryUnit(RecoveryUnit* ru);
+
+ /**
+ * Return the unowned RecoveryUnit. 'this' does not own pointer and therefore cannot
+ * transfer ownership.
+ */
+ RecoveryUnit* getUnownedRecoveryUnit() const;
+
+ /**
+ * If a ClientCursor is created via a client request, we bind its lifetime to the
+ * ClientCursor's by storing it un _ownedRU. In order to execute the query over repeated
+ * network requests, we have to keep the execution state around.
+ */
+
+ /**
+ * Set the owned recovery unit to 'ru'. Takes ownership of it. If there is a previous
+ * owned recovery unit, it is deleted.
+ */
+ void setOwnedRecoveryUnit(RecoveryUnit* ru);
+
+ /**
+ * Returns the owned recovery unit. Ownership is transferred to the caller.
+ */
+ RecoveryUnit* releaseOwnedRecoveryUnit();
+
private:
friend class ClientCursorMonitor;
friend class CmdCursorInfo;
@@ -209,6 +249,10 @@ namespace mongo {
// The underlying execution machinery.
//
scoped_ptr<PlanExecutor> _exec;
+
+ // Only one of these is not-NULL.
+ RecoveryUnit* _unownedRU;
+ std::auto_ptr<RecoveryUnit> _ownedRU;
};
/**
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 12f051e1b49..0e6d70768c6 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/commands.h"
#include "mongo/db/exec/pipeline_proxy.h"
+#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/pipeline/accumulator.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
@@ -142,6 +143,12 @@ namespace mongo {
// If a time limit was set on the pipeline, remaining time is "rolled over" to the
// cursor (for use by future getmore ops).
cursor->setLeftoverMaxTimeMicros( txn->getCurOp()->getRemainingMaxTimeMicros() );
+
+ // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent
+ // getMore requests. The calling OpCtx gets a fresh RecoveryUnit.
+ cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit());
+ StorageEngine* storageEngine = getGlobalEnvironment()->getGlobalStorageEngine();
+ txn->setRecoveryUnit(storageEngine->newRecoveryUnit(txn));
}
BSONObjBuilder cursorObj(result.subobjStart("cursor"));
@@ -250,6 +257,7 @@ namespace mongo {
}
if (collection) {
+ // XXX
ClientCursor* cursor = new ClientCursor(collection, execHolder.release());
cursor->isAggCursor = true; // enable special locking behavior
pin.reset(new ClientCursorPin(collection, cursor->cursorid()));
diff --git a/src/mongo/db/dbdirectclient.cpp b/src/mongo/db/dbdirectclient.cpp
index 7d43034ad58..8be0a50519d 100644
--- a/src/mongo/db/dbdirectclient.cpp
+++ b/src/mongo/db/dbdirectclient.cpp
@@ -121,7 +121,7 @@ namespace mongo {
}
DbResponse dbResponse;
- assembleResponse(_txn, toSend, dbResponse, dummyHost);
+ assembleResponse(_txn, toSend, dbResponse, dummyHost, true);
verify(dbResponse.response);
// can get rid of this if we make response handling smarter
@@ -138,7 +138,7 @@ namespace mongo {
}
DbResponse dbResponse;
- assembleResponse(_txn, toSend, dbResponse, dummyHost);
+ assembleResponse(_txn, toSend, dbResponse, dummyHost, true);
}
auto_ptr<DBClientCursor> DBDirectClient::query(const string& ns,
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index e463a9cfdf2..77516e8f617 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -107,7 +107,11 @@ namespace mongo {
void receivedUpdate(OperationContext* txn, Message& m, CurOp& op);
void receivedDelete(OperationContext* txn, Message& m, CurOp& op);
void receivedInsert(OperationContext* txn, Message& m, CurOp& op);
- bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop );
+ bool receivedGetMore(OperationContext* txn,
+ DbResponse& dbresponse,
+ Message& m,
+ CurOp& curop,
+ bool fromDBDirectClient);
int nloggedsome = 0;
#define LOGWITHRATELIMIT if( ++nloggedsome < 1000 || nloggedsome % 100 == 0 )
@@ -239,7 +243,11 @@ namespace mongo {
replyToQuery(0, m, dbresponse, obj);
}
- static bool receivedQuery(OperationContext* txn, Client& c, DbResponse& dbresponse, Message& m ) {
+ static bool receivedQuery(OperationContext* txn,
+ Client& c,
+ DbResponse& dbresponse,
+ Message& m,
+ bool fromDBDirectClient) {
bool ok = true;
MSGID responseTo = m.header().getId();
@@ -260,7 +268,7 @@ namespace mongo {
audit::logQueryAuthzCheck(client, ns, q.query, status.code());
uassertStatusOK(status);
}
- dbresponse.exhaustNS = newRunQuery(txn, m, q, op, *resp);
+ dbresponse.exhaustNS = newRunQuery(txn, m, q, op, *resp, fromDBDirectClient);
verify( !resp->empty() );
}
catch ( SendStaleConfigException& e ){
@@ -339,7 +347,8 @@ namespace mongo {
void assembleResponse( OperationContext* txn,
Message& m,
DbResponse& dbresponse,
- const HostAndPort& remote ) {
+ const HostAndPort& remote,
+ bool fromDBDirectClient ) {
// before we lock...
int op = m.operation();
bool isCommand = false;
@@ -434,10 +443,10 @@ namespace mongo {
if ( op == dbQuery ) {
if (!checkShardVersion(m, &dbresponse))
return;
- receivedQuery(txn, c , dbresponse, m );
+ receivedQuery(txn, c , dbresponse, m, fromDBDirectClient );
}
else if ( op == dbGetMore ) {
- if ( ! receivedGetMore(txn, dbresponse, m, currentOp) )
+ if ( ! receivedGetMore(txn, dbresponse, m, currentOp, fromDBDirectClient) )
shouldLog = true;
}
else if ( op == dbMsg ) {
@@ -658,7 +667,11 @@ namespace mongo {
QueryResult::View emptyMoreResult(long long);
- bool receivedGetMore(OperationContext* txn, DbResponse& dbresponse, Message& m, CurOp& curop ) {
+ bool receivedGetMore(OperationContext* txn,
+ DbResponse& dbresponse,
+ Message& m,
+ CurOp& curop,
+ bool fromDBDirectClient) {
bool ok = true;
DbMessage d(m);
@@ -708,7 +721,8 @@ namespace mongo {
curop,
pass,
exhaust,
- &isCursorAuthorized);
+ &isCursorAuthorized,
+ fromDBDirectClient);
}
catch ( AssertionException& e ) {
if ( isCursorAuthorized ) {
diff --git a/src/mongo/db/instance.h b/src/mongo/db/instance.h
index 3b0453fddec..8b14a0e37e3 100644
--- a/src/mongo/db/instance.h
+++ b/src/mongo/db/instance.h
@@ -71,7 +71,8 @@ namespace mongo {
void assembleResponse( OperationContext* txn,
Message& m,
DbResponse& dbresponse,
- const HostAndPort &client );
+ const HostAndPort &client,
+ bool fromDBDirectClient = false );
void maybeCreatePidFile();
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 5883b601d2a..e4efd8bc6b3 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -62,6 +62,22 @@ namespace mongo {
virtual RecoveryUnit* recoveryUnit() const = 0;
/**
+ * Returns the RecoveryUnit (same return value as recoveryUnit()) but the caller takes
+ * ownership of the returned RecoveryUnit, and the OperationContext instance relinquishes
+ * ownership. Sets the RecoveryUnit to NULL.
+ *
+ * Used to transfer ownership of storage engine state from OperationContext
+ * to ClientCursor for getMore-able queries.
+ *
+ * Note that we don't allow the top-level locks to be stored across getMore.
+ * We rely on active cursors being killed when collections or databases are dropped,
+ * or when collection metadata changes.
+ */
+ virtual RecoveryUnit* releaseRecoveryUnit() = 0;
+
+ virtual void setRecoveryUnit(RecoveryUnit* unit) = 0;
+
+ /**
* Interface for locking. Caller DOES NOT own pointer.
*/
virtual Locker* lockState() const = 0;
diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp
index 1f482f23127..596d9d4a1a7 100644
--- a/src/mongo/db/operation_context_impl.cpp
+++ b/src/mongo/db/operation_context_impl.cpp
@@ -61,6 +61,14 @@ namespace mongo {
return _recovery.get();
}
+ RecoveryUnit* OperationContextImpl::releaseRecoveryUnit() {
+ return _recovery.release();
+ }
+
+ void OperationContextImpl::setRecoveryUnit(RecoveryUnit* unit) {
+ _recovery.reset(unit);
+ }
+
Locker* OperationContextImpl::lockState() const {
return _locker.get();
}
diff --git a/src/mongo/db/operation_context_impl.h b/src/mongo/db/operation_context_impl.h
index c178936271f..e9c43c08928 100644
--- a/src/mongo/db/operation_context_impl.h
+++ b/src/mongo/db/operation_context_impl.h
@@ -43,6 +43,10 @@ namespace mongo {
virtual RecoveryUnit* recoveryUnit() const;
+ virtual RecoveryUnit* releaseRecoveryUnit();
+
+ virtual void setRecoveryUnit(RecoveryUnit* unit);
+
virtual Locker* lockState() const;
virtual ProgressMeter* setMessage(const char* msg,
@@ -69,8 +73,10 @@ namespace mongo {
virtual Transaction* getTransaction();
private:
- boost::scoped_ptr<RecoveryUnit> _recovery;
+ std::auto_ptr<RecoveryUnit> _recovery;
+
Transaction _tx;
+
boost::scoped_ptr<Locker> _locker;
};
diff --git a/src/mongo/db/operation_context_noop.h b/src/mongo/db/operation_context_noop.h
index 7aca695854d..ea991a855ad 100644
--- a/src/mongo/db/operation_context_noop.h
+++ b/src/mongo/db/operation_context_noop.h
@@ -61,6 +61,14 @@ namespace mongo {
return _recoveryUnit.get();
}
+ virtual RecoveryUnit* releaseRecoveryUnit() {
+ return _recoveryUnit.release();
+ }
+
+ virtual void setRecoveryUnit(RecoveryUnit* unit) {
+ _recoveryUnit.reset(unit);
+ }
+
virtual Locker* lockState() const {
// TODO: This should return an actual object if necessary for testing.
return NULL;
@@ -101,7 +109,7 @@ namespace mongo {
}
private:
- boost::scoped_ptr<RecoveryUnit> _recoveryUnit;
+ std::auto_ptr<RecoveryUnit> _recoveryUnit;
};
} // namespace mongo
diff --git a/src/mongo/db/query/new_find.cpp b/src/mongo/db/query/new_find.cpp
index e76a65cf55a..7b248e2428b 100644
--- a/src/mongo/db/query/new_find.cpp
+++ b/src/mongo/db/query/new_find.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/exec/filter.h"
#include "mongo/db/exec/oplogstart.h"
#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/keypattern.h"
#include "mongo/db/query/explain.h"
#include "mongo/db/query/find_constants.h"
@@ -136,8 +137,30 @@ namespace mongo {
return true;
}
+ struct ScopedRecoveryUnitSwapper {
+ explicit ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn)
+ : _cc(cc), _txn(txn) {
+
+ // Save this for later. We restore it upon destruction.
+ _txnPreviousRecoveryUnit = txn->releaseRecoveryUnit();
+
+ // Transfer ownership of the RecoveryUnit from the ClientCursor to the OpCtx.
+ RecoveryUnit* ccRecoveryUnit = cc->releaseOwnedRecoveryUnit();
+ txn->setRecoveryUnit(ccRecoveryUnit);
+ }
+
+ ~ScopedRecoveryUnitSwapper() {
+ _cc->setOwnedRecoveryUnit(_txn->releaseRecoveryUnit());
+ _txn->setRecoveryUnit(_txnPreviousRecoveryUnit);
+ }
+
+ ClientCursor* _cc;
+ OperationContext* _txn;
+ RecoveryUnit* _txnPreviousRecoveryUnit;
+ };
+
/**
- * Also called by db/ops/query.cpp. This is the new getMore entry point.
+ * Called by db/instance.cpp. This is the getMore entry point.
*
* pass - when QueryOption_AwaitData is in use, the caller will make repeated calls
* when this method returns an empty result, incrementing pass on each call.
@@ -150,7 +173,8 @@ namespace mongo {
CurOp& curop,
int pass,
bool& exhaust,
- bool* isCursorAuthorized) {
+ bool* isCursorAuthorized,
+ bool fromDBDirectClient) {
// For testing, we may want to fail if we receive a getmore.
if (MONGO_FAIL_POINT(failReceivedGetmore)) {
@@ -182,6 +206,15 @@ namespace mongo {
ClientCursorPin ccPin(collection, cursorid);
ClientCursor* cc = ccPin.c();
+ // If we're not being called from DBDirectClient we want to associate the RecoveryUnit
+ // used to create the execution machinery inside the cursor with our OperationContext.
+ // If we throw or otherwise exit this method in a disorderly fashion, we must ensure
+ // that further calls to getMore won't fail, and that the provided OperationContext
+ // has a valid RecoveryUnit. As such, we use RAII to accomplish this.
+ //
+ // This must be destroyed before the ClientCursor is destroyed.
+ std::auto_ptr<ScopedRecoveryUnitSwapper> ruSwapper;
+
// These are set in the QueryResult msg we return.
int resultFlags = ResultFlag_AwaitCapable;
@@ -204,6 +237,15 @@ namespace mongo {
uassert(17011, "auth error", str::equals(ns, cc->ns().c_str()));
*isCursorAuthorized = true;
+ // Restore the RecoveryUnit if we need to.
+ if (fromDBDirectClient) {
+ invariant(txn->recoveryUnit() == cc->getUnownedRecoveryUnit());
+ }
+ else {
+ // Swap RecoveryUnit(s) between the ClientCursor and OperationContext.
+ ruSwapper.reset(new ScopedRecoveryUnitSwapper(cc, txn));
+ }
+
// Reset timeout timer on the cursor since the cursor is still in use.
cc->setIdleTime(0);
@@ -313,6 +355,7 @@ namespace mongo {
}
if (!saveClientCursor) {
+ ruSwapper.reset();
ccPin.deleteUnderlying();
// cc is now invalid, as is the executor
cursorid = 0;
@@ -434,7 +477,8 @@ namespace mongo {
Message& m,
QueryMessage& q,
CurOp& curop,
- Message &result) {
+ Message &result,
+ bool fromDBDirectClient) {
// Validate the namespace.
const char *ns = q.ns;
uassert(16332, "can't have an empty ns", ns[0]);
@@ -755,6 +799,17 @@ namespace mongo {
cq->getParsed().getFilter());
ccId = cc->cursorid();
+ if (fromDBDirectClient) {
+ cc->setUnownedRecoveryUnit(txn->recoveryUnit());
+ }
+ else {
+ // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent
+ // getMore requests. The calling OpCtx gets a fresh RecoveryUnit.
+ cc->setOwnedRecoveryUnit(txn->releaseRecoveryUnit());
+ StorageEngine* storageEngine = getGlobalEnvironment()->getGlobalStorageEngine();
+ txn->setRecoveryUnit(storageEngine->newRecoveryUnit(txn));
+ }
+
QLOG() << "caching executor with cursorid " << ccId
<< " after returning " << numResults << " results" << endl;
diff --git a/src/mongo/db/query/new_find.h b/src/mongo/db/query/new_find.h
index c45f06a3f0c..6a020e2e07d 100644
--- a/src/mongo/db/query/new_find.h
+++ b/src/mongo/db/query/new_find.h
@@ -65,7 +65,8 @@ namespace mongo {
CurOp& curop,
int pass,
bool& exhaust,
- bool* isCursorAuthorized);
+ bool* isCursorAuthorized,
+ bool fromDBDirectClient);
/**
* Run the query 'q' and place the result in 'result'.
@@ -74,6 +75,7 @@ namespace mongo {
Message& m,
QueryMessage& q,
CurOp& curop,
- Message &result);
+ Message &result,
+ bool fromDBDirectClient);
} // namespace mongo
diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp
index 865e71ed153..aaeaf6c6817 100644
--- a/src/mongo/dbtests/querytests.cpp
+++ b/src/mongo/dbtests/querytests.cpp
@@ -1432,7 +1432,7 @@ namespace QueryTests {
DbMessage dbMessage( message );
QueryMessage queryMessage( dbMessage );
Message result;
- string exhaust = newRunQuery( &_txn, message, queryMessage, *cc().curop(), result );
+ string exhaust = newRunQuery( &_txn, message, queryMessage, *cc().curop(), result, false );
ASSERT( exhaust.size() );
ASSERT_EQUALS( string( ns() ), exhaust );
}