summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2015-07-06 19:16:30 -0400
committerMathias Stearn <mathias@10gen.com>2015-07-16 14:37:02 -0400
commitb9d2e18ca68246e5d21ed42a846ff4094867f159 (patch)
treecdbbac6dc5ee00404cf6452f5dd70612983127e3 /src
parentc832bc753c29f91597b75fa02c0d9019c3c20b0f (diff)
downloadmongo-b9d2e18ca68246e5d21ed42a846ff4094867f159.tar.gz
SERVER-17364 Don't stash RecoveryUnits across getMores
We now tell PlanExecutors to detach from their OperationContexts and to shed all storage engine resources before stashing the ClientCursor. This is a heavier weight operation than a normal save/restoreState which is no longer allowed to change the OperationContext.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/catalog/capped_utils.cpp2
-rw-r--r--src/mongo/db/catalog/index_create.cpp2
-rw-r--r--src/mongo/db/clientcursor.cpp33
-rw-r--r--src/mongo/db/clientcursor.h60
-rw-r--r--src/mongo/db/commands/find_cmd.cpp24
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp22
-rw-r--r--src/mongo/db/commands/list_collections.cpp8
-rw-r--r--src/mongo/db/commands/list_indexes.cpp8
-rw-r--r--src/mongo/db/commands/mr.cpp4
-rw-r--r--src/mongo/db/commands/parallel_collection_scan.cpp51
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp28
-rw-r--r--src/mongo/db/commands/repair_cursor.cpp6
-rw-r--r--src/mongo/db/dbcommands.cpp2
-rw-r--r--src/mongo/db/exec/cached_plan.cpp2
-rw-r--r--src/mongo/db/exec/cached_plan.h2
-rw-r--r--src/mongo/db/exec/collection_scan.cpp22
-rw-r--r--src/mongo/db/exec/collection_scan.h4
-rw-r--r--src/mongo/db/exec/count.cpp2
-rw-r--r--src/mongo/db/exec/count.h2
-rw-r--r--src/mongo/db/exec/count_scan.cpp21
-rw-r--r--src/mongo/db/exec/count_scan.h4
-rw-r--r--src/mongo/db/exec/delete.cpp9
-rw-r--r--src/mongo/db/exec/delete.h3
-rw-r--r--src/mongo/db/exec/distinct_scan.cpp18
-rw-r--r--src/mongo/db/exec/distinct_scan.h4
-rw-r--r--src/mongo/db/exec/fetch.cpp16
-rw-r--r--src/mongo/db/exec/fetch.h4
-rw-r--r--src/mongo/db/exec/group.cpp2
-rw-r--r--src/mongo/db/exec/group.h2
-rw-r--r--src/mongo/db/exec/idhack.cpp16
-rw-r--r--src/mongo/db/exec/idhack.h4
-rw-r--r--src/mongo/db/exec/index_scan.cpp21
-rw-r--r--src/mongo/db/exec/index_scan.h4
-rw-r--r--src/mongo/db/exec/multi_iterator.cpp26
-rw-r--r--src/mongo/db/exec/multi_iterator.h4
-rw-r--r--src/mongo/db/exec/multi_plan.cpp2
-rw-r--r--src/mongo/db/exec/multi_plan.h2
-rw-r--r--src/mongo/db/exec/near.cpp2
-rw-r--r--src/mongo/db/exec/near.h2
-rw-r--r--src/mongo/db/exec/oplogstart.cpp23
-rw-r--r--src/mongo/db/exec/oplogstart.h4
-rw-r--r--src/mongo/db/exec/pipeline_proxy.cpp10
-rw-r--r--src/mongo/db/exec/pipeline_proxy.h7
-rw-r--r--src/mongo/db/exec/plan_stage.cpp22
-rw-r--r--src/mongo/db/exec/plan_stage.h38
-rw-r--r--src/mongo/db/exec/queued_data_stage_test.cpp2
-rw-r--r--src/mongo/db/exec/subplan.cpp2
-rw-r--r--src/mongo/db/exec/subplan.h2
-rw-r--r--src/mongo/db/exec/text_or.cpp20
-rw-r--r--src/mongo/db/exec/text_or.h4
-rw-r--r--src/mongo/db/exec/update.cpp16
-rw-r--r--src/mongo/db/exec/update.h5
-rw-r--r--src/mongo/db/operation_context_impl.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp4
-rw-r--r--src/mongo/db/query/find.cpp98
-rw-r--r--src/mongo/db/query/find.h21
-rw-r--r--src/mongo/db/query/plan_executor.cpp45
-rw-r--r--src/mongo/db/query/plan_executor.h34
-rw-r--r--src/mongo/db/query/plan_yield_policy.cpp2
-rw-r--r--src/mongo/db/repair_database.cpp2
-rw-r--r--src/mongo/db/storage/devnull/devnull_kv_engine.cpp4
-rw-r--r--src/mongo/db/storage/in_memory/in_memory_btree_impl.cpp13
-rw-r--r--src/mongo/db/storage/in_memory/in_memory_record_store.cpp22
-rw-r--r--src/mongo/db/storage/mmap_v1/btree/btree_interface.cpp17
-rw-r--r--src/mongo/db/storage/mmap_v1/record_store_v1_base.h8
-rw-r--r--src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp7
-rw-r--r--src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h8
-rw-r--r--src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h9
-rw-r--r--src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp7
-rw-r--r--src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h8
-rw-r--r--src/mongo/db/storage/record_store.h19
-rw-r--r--src/mongo/db/storage/record_store_test_recorditer.cpp6
-rw-r--r--src/mongo/db/storage/record_store_test_repairiter.cpp2
-rw-r--r--src/mongo/db/storage/recovery_unit.h10
-rw-r--r--src/mongo/db/storage/sorted_data_interface.h19
-rw-r--r--src/mongo/db/storage/sorted_data_interface_test_cursor_end_position.cpp16
-rw-r--r--src/mongo/db/storage/sorted_data_interface_test_cursor_saverestore.cpp38
-rw-r--r--src/mongo/db/storage/sorted_data_interface_test_harness.cpp4
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp83
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp77
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp4
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp13
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h6
-rw-r--r--src/mongo/db/ttl.cpp2
-rw-r--r--src/mongo/dbtests/executor_registry.cpp14
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp6
-rw-r--r--src/mongo/dbtests/query_stage_and.cpp8
-rw-r--r--src/mongo/dbtests/query_stage_collscan.cpp4
-rw-r--r--src/mongo/dbtests/query_stage_count.cpp2
-rw-r--r--src/mongo/dbtests/query_stage_count_scan.cpp10
-rw-r--r--src/mongo/dbtests/query_stage_delete.cpp2
-rw-r--r--src/mongo/dbtests/query_stage_ixscan.cpp8
-rw-r--r--src/mongo/dbtests/query_stage_merge_sort.cpp2
-rw-r--r--src/mongo/dbtests/query_stage_sort.cpp8
-rw-r--r--src/mongo/dbtests/query_stage_update.cpp2
95 files changed, 662 insertions, 621 deletions
diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp
index b1adb472565..e66cfa6b9e9 100644
--- a/src/mongo/db/catalog/capped_utils.cpp
+++ b/src/mongo/db/catalog/capped_utils.cpp
@@ -197,7 +197,7 @@ Status cloneCollectionAsCapped(OperationContext* txn,
// around call to abandonSnapshot.
exec->saveState();
txn->recoveryUnit()->abandonSnapshot();
- exec->restoreState(txn); // Handles any WCEs internally.
+ exec->restoreState(); // Handles any WCEs internally.
}
}
diff --git a/src/mongo/db/catalog/index_create.cpp b/src/mongo/db/catalog/index_create.cpp
index c642fcb83a5..8f654411988 100644
--- a/src/mongo/db/catalog/index_create.cpp
+++ b/src/mongo/db/catalog/index_create.cpp
@@ -286,7 +286,7 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(std::set<RecordId>* dupsO
// around call to abandonSnapshot.
exec->saveState();
_txn->recoveryUnit()->abandonSnapshot();
- exec->restoreState(_txn); // Handles any WCEs internally.
+ exec->restoreState(); // Handles any WCEs internally.
}
}
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index 7b45e3444ea..c5499ee7166 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -80,14 +80,15 @@ long long ClientCursor::totalOpen() {
ClientCursor::ClientCursor(CursorManager* cursorManager,
PlanExecutor* exec,
const std::string& ns,
+ bool isReadCommitted,
int qopts,
const BSONObj query,
bool isAggCursor)
: _ns(ns),
+ _isReadCommitted(isReadCommitted),
_cursorManager(cursorManager),
_countedYet(false),
- _isAggCursor(isAggCursor),
- _unownedRU(NULL) {
+ _isAggCursor(isAggCursor) {
_exec.reset(exec);
_query = query;
_queryOptions = qopts;
@@ -99,11 +100,11 @@ ClientCursor::ClientCursor(CursorManager* cursorManager,
ClientCursor::ClientCursor(const Collection* collection)
: _ns(collection->ns().ns()),
+ _isReadCommitted(false),
_cursorManager(collection->getCursorManager()),
_countedYet(false),
_queryOptions(QueryOption_NoCursorTimeout),
- _isAggCursor(false),
- _unownedRU(NULL) {
+ _isAggCursor(false) {
init();
}
@@ -197,30 +198,6 @@ void ClientCursor::updateSlaveLocation(OperationContext* txn) {
}
//
-// Storage engine state for getMore.
-//
-
-void ClientCursor::setUnownedRecoveryUnit(RecoveryUnit* ru) {
- invariant(!_unownedRU);
- invariant(!_ownedRU.get());
- _unownedRU = ru;
-}
-
-RecoveryUnit* ClientCursor::getUnownedRecoveryUnit() const {
- return _unownedRU;
-}
-
-void ClientCursor::setOwnedRecoveryUnit(RecoveryUnit* ru) {
- invariant(!_unownedRU);
- invariant(!_ownedRU.get());
- _ownedRU.reset(ru);
-}
-
-RecoveryUnit* ClientCursor::releaseOwnedRecoveryUnit() {
- return _ownedRU.release();
-}
-
-//
// Pin methods
//
diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h
index 3f7e9797ae5..832d24e9a87 100644
--- a/src/mongo/db/clientcursor.h
+++ b/src/mongo/db/clientcursor.h
@@ -59,12 +59,15 @@ public:
ClientCursor(CursorManager* cursorManager,
PlanExecutor* exec,
const std::string& ns,
+ bool isReadCommitted,
int qopts = 0,
const BSONObj query = BSONObj(),
bool isAggCursor = false);
/**
* This ClientCursor is used to track sharding state for the given collection.
+ *
+ * Do not use outside of RangePreserver!
*/
explicit ClientCursor(const Collection* collection);
@@ -81,6 +84,9 @@ public:
CursorManager* cursorManager() const {
return _cursorManager;
}
+ bool isReadCommitted() const {
+ return _isReadCommitted;
+ }
bool isAggCursor() const {
return _isAggCursor;
}
@@ -180,49 +186,6 @@ public:
static long long totalOpen();
- //
- // Storage engine state for getMore.
- //
-
- bool hasRecoveryUnit() const {
- return _ownedRU.get() || _unownedRU;
- }
-
- /**
- *
- * If a ClientCursor is created via DBDirectClient, it uses the same storage engine
- * context as the DBDirectClient caller. We store this context in _unownedRU. We use
- * this to verify that all further callers use the same RecoveryUnit.
- *
- * Once a ClientCursor has an unowned RecoveryUnit, it will always have one.
- *
- * Sets the unowned RecoveryUnit to 'ru'. Does NOT take ownership of the pointer.
- */
- void setUnownedRecoveryUnit(RecoveryUnit* ru);
-
- /**
- * Return the unowned RecoveryUnit. 'this' does not own pointer and therefore cannot
- * transfer ownership.
- */
- RecoveryUnit* getUnownedRecoveryUnit() const;
-
- /**
- * If a ClientCursor is created via a client request, we bind its lifetime to the
- * ClientCursor's by storing it un _ownedRU. In order to execute the query over repeated
- * network requests, we have to keep the execution state around.
- */
-
- /**
- * Set the owned recovery unit to 'ru'. Takes ownership of it. If there is a previous
- * owned recovery unit, it is deleted.
- */
- void setOwnedRecoveryUnit(RecoveryUnit* ru);
-
- /**
- * Returns the owned recovery unit. Ownership is transferred to the caller.
- */
- RecoveryUnit* releaseOwnedRecoveryUnit();
-
private:
friend class CursorManager;
friend class ClientCursorPin;
@@ -248,6 +211,8 @@ private:
// The namespace we're operating on.
std::string _ns;
+ const bool _isReadCommitted;
+
CursorManager* _cursorManager;
// if we've added it to the total open counter yet
@@ -270,7 +235,7 @@ private:
// should not be killed or destroyed when the underlying collection is deleted.
//
// Note: This should *not* be set for the internal cursor used as input to an aggregation.
- bool _isAggCursor;
+ const bool _isAggCursor;
// Is this cursor in use? Defaults to false.
bool _isPinned;
@@ -288,13 +253,6 @@ private:
// TODO: Document.
uint64_t _leftoverMaxTimeMicros;
- // Only one of these is not-NULL.
- RecoveryUnit* _unownedRU;
- std::unique_ptr<RecoveryUnit> _ownedRU;
- // NOTE: _ownedRU must come before _exec, because _ownedRU must outlive _exec.
- // The storage engine can have resources in the PlanExecutor that rely on
- // the RecoveryUnit being alive.
-
//
// The underlying execution machinery.
//
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index f25010c3ce0..36d9e6f4f82 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -285,11 +285,13 @@ public:
// Create a ClientCursor containing this plan executor. We don't have to worry
// about leaking it as it's inserted into a global map by its ctor.
- ClientCursor* cursor = new ClientCursor(collection->getCursorManager(),
- exec.release(),
- nss.ns(),
- pq.getOptions(),
- pq.getFilter());
+ ClientCursor* cursor =
+ new ClientCursor(collection->getCursorManager(),
+ exec.release(),
+ nss.ns(),
+ txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ pq.getOptions(),
+ pq.getFilter());
CursorId cursorId = cursor->cursorid();
ClientCursorPin ccPin(collection->getCursorManager(), cursorId);
@@ -335,20 +337,10 @@ public:
if (shouldSaveCursor(txn, collection, state, cursorExec)) {
// State will be restored on getMore.
cursorExec->saveState();
+ cursorExec->detachFromOperationContext();
cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros());
cursor->setPos(numResults);
-
- // Don't stash the RU for tailable cursors at EOF, let them get a new RU on their
- // next getMore.
- if (!(pq.isTailable() && state == PlanExecutor::IS_EOF)) {
- // We stash away the RecoveryUnit in the ClientCursor. It's used for
- // subsequent getMore requests. The calling OpCtx gets a fresh RecoveryUnit.
- txn->recoveryUnit()->abandonSnapshot();
- cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit());
- StorageEngine* engine = getGlobalServiceContext()->getGlobalStorageEngine();
- txn->setRecoveryUnit(engine->newRecoveryUnit(), OperationContext::kNotInUnitOfWork);
- }
} else {
cursorId = 0;
}
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 980ce11703c..06a2d14b97d 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -244,14 +244,8 @@ public:
// On early return, get rid of the cursor.
ScopeGuard cursorFreer = MakeGuard(&GetMoreCmd::cleanupCursor, txn, &ccPin, request);
- if (!cursor->hasRecoveryUnit()) {
- // Start using a new RecoveryUnit.
- cursor->setOwnedRecoveryUnit(
- getGlobalServiceContext()->getGlobalStorageEngine()->newRecoveryUnit());
- }
-
- // Swap RecoveryUnit(s) between the ClientCursor and OperationContext.
- ScopedRecoveryUnitSwapper ruSwapper(cursor, txn);
+ if (cursor->isReadCommitted())
+ uassertStatusOK(txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
// Reset timeout timer on the cursor since the cursor is still in use.
cursor->setIdleTime(0);
@@ -270,7 +264,8 @@ public:
}
PlanExecutor* exec = cursor->getExecutor();
- exec->restoreState(txn);
+ exec->reattachToOperationContext(txn);
+ exec->restoreState();
// If we're tailing a capped collection, retrieve a monotonically increasing insert
// counter.
@@ -309,7 +304,7 @@ public:
notifier.reset();
ctx.reset(new AutoGetCollectionForRead(txn, request.nss));
- exec->restoreState(txn);
+ exec->restoreState();
// We woke up because either the timed_wait expired, or there was more data. Either
// way, attempt to generate another batch of results.
@@ -323,6 +318,7 @@ public:
respondWithId = request.cursorid;
exec->saveState();
+ exec->detachFromOperationContext();
// If maxTimeMS was set directly on the getMore rather than being rolled over
// from a previous find, then don't roll remaining micros over to the next
@@ -332,12 +328,6 @@ public:
}
cursor->incPos(numResults);
-
- if (isCursorTailable(cursor) && state == PlanExecutor::IS_EOF) {
- // Rather than swapping their existing RU into the client cursor, tailable
- // cursors should get a new recovery unit.
- ruSwapper.dismiss();
- }
} else {
CurOp::get(txn)->debug().cursorExhausted = true;
}
diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp
index 8fb6f063590..dd3549d15cc 100644
--- a/src/mongo/db/commands/list_collections.cpp
+++ b/src/mongo/db/commands/list_collections.cpp
@@ -194,8 +194,12 @@ public:
CursorId cursorId = 0LL;
if (!exec->isEOF()) {
exec->saveState();
- ClientCursor* cursor = new ClientCursor(
- CursorManager::getGlobalCursorManager(), exec.release(), cursorNamespace);
+ exec->detachFromOperationContext();
+ ClientCursor* cursor =
+ new ClientCursor(CursorManager::getGlobalCursorManager(),
+ exec.release(),
+ cursorNamespace,
+ txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot());
cursorId = cursor->cursorid();
}
diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp
index 06d08e9cce0..0f46e4e7427 100644
--- a/src/mongo/db/commands/list_indexes.cpp
+++ b/src/mongo/db/commands/list_indexes.cpp
@@ -192,8 +192,12 @@ public:
CursorId cursorId = 0LL;
if (!exec->isEOF()) {
exec->saveState();
- ClientCursor* cursor = new ClientCursor(
- CursorManager::getGlobalCursorManager(), exec.release(), cursorNamespace);
+ exec->detachFromOperationContext();
+ ClientCursor* cursor =
+ new ClientCursor(CursorManager::getGlobalCursorManager(),
+ exec.release(),
+ cursorNamespace,
+ txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot());
cursorId = cursor->cursorid();
}
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 307f848d3f9..c275157e762 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -1048,7 +1048,7 @@ void State::finalReduce(CurOp* op, ProgressMeterHolder& pm) {
prev = o;
all.push_back(o);
- if (!exec->restoreState(_txn)) {
+ if (!exec->restoreState()) {
break;
}
@@ -1429,7 +1429,7 @@ public:
scopedXact.reset(new ScopedTransaction(txn, MODE_IS));
scopedAutoDb.reset(new AutoGetDb(txn, nss.db(), MODE_S));
- exec->restoreState(txn);
+ exec->restoreState();
// Need to reload the database, in case it was dropped after we
// released the lock
diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp
index c1dcf50dfe0..351fc4f7711 100644
--- a/src/mongo/db/commands/parallel_collection_scan.cpp
+++ b/src/mongo/db/commands/parallel_collection_scan.cpp
@@ -39,7 +39,7 @@
#include "mongo/db/query/cursor_responses.h"
#include "mongo/db/service_context.h"
#include "mongo/stdx/memory.h"
-#include "mongo/util/touch_pages.h"
+#include "mongo/base/checked_cast.h"
namespace mongo {
@@ -111,7 +111,7 @@ public:
numCursors = iterators.size();
}
- OwnedPointerVector<PlanExecutor> execs;
+ std::vector<std::unique_ptr<PlanExecutor>> execs;
for (size_t i = 0; i < numCursors; i++) {
unique_ptr<WorkingSet> ws = make_unique<WorkingSet>();
unique_ptr<MultiIteratorStage> mis =
@@ -121,50 +121,35 @@ public:
auto statusWithPlanExecutor = PlanExecutor::make(
txn, std::move(ws), std::move(mis), collection, PlanExecutor::YIELD_AUTO);
invariant(statusWithPlanExecutor.isOK());
- unique_ptr<PlanExecutor> curExec = std::move(statusWithPlanExecutor.getValue());
-
- // The PlanExecutor was registered on construction due to the YIELD_AUTO policy.
- // We have to deregister it, as it will be registered with ClientCursor.
- curExec->deregisterExec();
-
- // Need to save state while yielding locks between now and getMore().
- curExec->saveState();
-
- execs.push_back(curExec.release());
+ execs.push_back(std::move(statusWithPlanExecutor.getValue()));
}
// transfer iterators to executors using a round-robin distribution.
// TODO consider using a common work queue once invalidation issues go away.
for (size_t i = 0; i < iterators.size(); i++) {
- PlanExecutor* theExec = execs[i % execs.size()];
- MultiIteratorStage* mis = static_cast<MultiIteratorStage*>(theExec->getRootStage());
-
- // This wasn't called above as they weren't assigned yet
- iterators[i]->savePositioned();
-
+ auto& planExec = execs[i % execs.size()];
+ MultiIteratorStage* mis = checked_cast<MultiIteratorStage*>(planExec->getRootStage());
mis->addIterator(std::move(iterators[i]));
}
{
BSONArrayBuilder bucketsBuilder;
- for (size_t i = 0; i < execs.size(); i++) {
+ for (auto&& exec : execs) {
+ // The PlanExecutor was registered on construction due to the YIELD_AUTO policy.
+ // We have to deregister it, as it will be registered with ClientCursor.
+ exec->deregisterExec();
+
+ // Need to save state while yielding locks between now and getMore().
+ exec->saveState();
+ exec->detachFromOperationContext();
+
// transfer ownership of an executor to the ClientCursor (which manages its own
// lifetime).
ClientCursor* cc =
- new ClientCursor(collection->getCursorManager(), execs.releaseAt(i), ns.ns());
-
- if (cmdObj["$readMajorityTemporaryName"].trueValue()) {
- // Need to make RecoveryUnits for each cursor so that the getMores know to
- // use readMajority. This will need to be replaced with a setting on the
- // client cursor once we resolve SERVER-17364.
- StorageEngine* storageEngine =
- getGlobalServiceContext()->getGlobalStorageEngine();
- std::unique_ptr<RecoveryUnit> newRu(storageEngine->newRecoveryUnit());
- // Wouldn't have entered run() if not supported.
- invariantOK(newRu->setReadFromMajorityCommittedSnapshot());
-
- cc->setOwnedRecoveryUnit(newRu.release());
- }
+ new ClientCursor(collection->getCursorManager(),
+ exec.release(),
+ ns.ns(),
+ txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot());
BSONObjBuilder threadResult;
appendCursorResponseObject(cc->cursorid(), ns.ns(), BSONArray(), &threadResult);
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index 43e9d488427..9d971447dd9 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -130,22 +130,10 @@ static bool handleCursorCommand(OperationContext* txn,
CurOp::get(txn)->debug().cursorid = cursor->cursorid();
- if (txn->getClient()->isInDirectClient()) {
- cursor->setUnownedRecoveryUnit(txn->recoveryUnit());
- } else {
- // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent
- // getMore requests. The calling OpCtx gets a fresh RecoveryUnit.
- txn->recoveryUnit()->abandonSnapshot();
- cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit());
- StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
- invariant(txn->setRecoveryUnit(storageEngine->newRecoveryUnit(),
- OperationContext::kNotInUnitOfWork) ==
- OperationContext::kNotInUnitOfWork);
- }
-
// Cursor needs to be in a saved state while we yield locks for getmore. State
// will be restored in getMore().
exec->saveState();
+ exec->detachFromOperationContext();
}
const long long cursorId = cursor ? cursor->cursorid() : 0LL;
@@ -263,12 +251,14 @@ public:
if (collection) {
const bool isAggCursor = true; // enable special locking behavior
- ClientCursor* cursor = new ClientCursor(collection->getCursorManager(),
- exec.release(),
- nss.ns(),
- 0,
- cmdObj.getOwned(),
- isAggCursor);
+ ClientCursor* cursor =
+ new ClientCursor(collection->getCursorManager(),
+ exec.release(),
+ nss.ns(),
+ txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ 0,
+ cmdObj.getOwned(),
+ isAggCursor);
pin.reset(new ClientCursorPin(collection->getCursorManager(), cursor->cursorid()));
// Don't add any code between here and the start of the try block.
}
diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp
index eef18cfcfeb..5f2f88c04ca 100644
--- a/src/mongo/db/commands/repair_cursor.cpp
+++ b/src/mongo/db/commands/repair_cursor.cpp
@@ -103,11 +103,15 @@ public:
// it now so that it can be registed with ClientCursor.
exec->deregisterExec();
exec->saveState();
+ exec->detachFromOperationContext();
// ClientCursors' constructor inserts them into a global map that manages their
// lifetimes. That is why the next line isn't leaky.
ClientCursor* cc =
- new ClientCursor(collection->getCursorManager(), exec.release(), ns.ns());
+ new ClientCursor(collection->getCursorManager(),
+ exec.release(),
+ ns.ns(),
+ txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot());
appendCursorResponseObject(cc->cursorid(), ns.ns(), BSONArray(), &result);
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index 8c219052961..e65f249a66f 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -640,7 +640,7 @@ public:
}
// Have the lock again. See if we were killed.
- if (!exec->restoreState(txn)) {
+ if (!exec->restoreState()) {
if (!partialOk) {
uasserted(13281, "File deleted during filemd5 command");
}
diff --git a/src/mongo/db/exec/cached_plan.cpp b/src/mongo/db/exec/cached_plan.cpp
index 7ad6ef16a6e..dd0b751c326 100644
--- a/src/mongo/db/exec/cached_plan.cpp
+++ b/src/mongo/db/exec/cached_plan.cpp
@@ -292,7 +292,7 @@ PlanStage::StageState CachedPlanStage::work(WorkingSetID* out) {
}
-void CachedPlanStage::doRestoreState(OperationContext* opCtx) {
+void CachedPlanStage::doReattachToOperationContext(OperationContext* opCtx) {
_txn = opCtx;
}
diff --git a/src/mongo/db/exec/cached_plan.h b/src/mongo/db/exec/cached_plan.h
index 117e6aa5dfa..7cf4a1ebc88 100644
--- a/src/mongo/db/exec/cached_plan.h
+++ b/src/mongo/db/exec/cached_plan.h
@@ -65,7 +65,7 @@ public:
virtual StageState work(WorkingSetID* out);
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
virtual StageType stageType() const {
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index 9eee74c4451..ef5fb5f9fee 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -215,23 +215,33 @@ void CollectionScan::doInvalidate(OperationContext* txn,
}
void CollectionScan::doSaveState() {
- _txn = NULL;
if (_cursor) {
_cursor->savePositioned();
}
}
-void CollectionScan::doRestoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
+void CollectionScan::doRestoreState() {
if (_cursor) {
- if (!_cursor->restore(opCtx)) {
- warning() << "Could not restore RecordCursor for CollectionScan: " << opCtx->getNS();
+ if (!_cursor->restore()) {
+ warning() << "Could not restore RecordCursor for CollectionScan: " << _txn->getNS();
_isDead = true;
}
}
}
+void CollectionScan::doDetachFromOperationContext() {
+ _txn = NULL;
+ if (_cursor)
+ _cursor->detachFromOperationContext();
+}
+
+void CollectionScan::doReattachToOperationContext(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+ if (_cursor)
+ _cursor->reattachToOperationContext(opCtx);
+}
+
unique_ptr<PlanStageStats> CollectionScan::getStats() {
// Add a BSON representation of the filter to the stats tree, if there is one.
if (NULL != _filter) {
diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h
index 856af5e10ad..e08d583a7a3 100644
--- a/src/mongo/db/exec/collection_scan.h
+++ b/src/mongo/db/exec/collection_scan.h
@@ -59,7 +59,9 @@ public:
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
virtual void doSaveState();
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doRestoreState();
+ virtual void doDetachFromOperationContext();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual StageType stageType() const {
return STAGE_COLLSCAN;
diff --git a/src/mongo/db/exec/count.cpp b/src/mongo/db/exec/count.cpp
index bcc389a488b..bb21de3cf81 100644
--- a/src/mongo/db/exec/count.cpp
+++ b/src/mongo/db/exec/count.cpp
@@ -163,7 +163,7 @@ PlanStage::StageState CountStage::work(WorkingSetID* out) {
return PlanStage::NEED_TIME;
}
-void CountStage::doRestoreState(OperationContext* opCtx) {
+void CountStage::doReattachToOperationContext(OperationContext* opCtx) {
_txn = opCtx;
}
diff --git a/src/mongo/db/exec/count.h b/src/mongo/db/exec/count.h
index ccb4a4b364a..110c9923119 100644
--- a/src/mongo/db/exec/count.h
+++ b/src/mongo/db/exec/count.h
@@ -56,7 +56,7 @@ public:
virtual bool isEOF();
virtual StageState work(WorkingSetID* out);
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual StageType stageType() const {
return STAGE_COUNT;
diff --git a/src/mongo/db/exec/count_scan.cpp b/src/mongo/db/exec/count_scan.cpp
index e6a9f4b667c..6ba9d710f5d 100644
--- a/src/mongo/db/exec/count_scan.cpp
+++ b/src/mongo/db/exec/count_scan.cpp
@@ -121,23 +121,32 @@ bool CountScan::isEOF() {
}
void CountScan::doSaveState() {
- _txn = NULL;
if (_cursor)
_cursor->savePositioned();
}
-void CountScan::doRestoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
-
+void CountScan::doRestoreState() {
if (_cursor)
- _cursor->restore(opCtx);
+ _cursor->restore();
// This can change during yielding.
// TODO this isn't sufficient. See SERVER-17678.
_shouldDedup = _descriptor->isMultikey(_txn);
}
+void CountScan::doDetachFromOperationContext() {
+ _txn = NULL;
+ if (_cursor)
+ _cursor->detachFromOperationContext();
+}
+
+void CountScan::doReattachToOperationContext(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+ if (_cursor)
+ _cursor->reattachToOperationContext(opCtx);
+}
+
void CountScan::doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
// The only state we're responsible for holding is what RecordIds to drop. If a document
// mutates the underlying index cursor will deal with it.
diff --git a/src/mongo/db/exec/count_scan.h b/src/mongo/db/exec/count_scan.h
index ffb0035417d..dbbd85c5715 100644
--- a/src/mongo/db/exec/count_scan.h
+++ b/src/mongo/db/exec/count_scan.h
@@ -71,7 +71,9 @@ public:
virtual StageState work(WorkingSetID* out);
virtual bool isEOF();
virtual void doSaveState();
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doRestoreState();
+ virtual void doDetachFromOperationContext();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
virtual StageType stageType() const {
diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp
index bdc78806521..4a9f6638c2d 100644
--- a/src/mongo/db/exec/delete.cpp
+++ b/src/mongo/db/exec/delete.cpp
@@ -207,7 +207,7 @@ PlanStage::StageState DeleteStage::work(WorkingSetID* out) {
// transaction in which they are created, and a WriteUnitOfWork is a
// transaction, make sure to restore the state outside of the WritUnitOfWork.
try {
- child()->restoreState(_txn);
+ child()->restoreState();
} catch (const WriteConflictException& wce) {
// Note we don't need to retry anything in this case since the delete already
// was committed. However, we still need to return the deleted document
@@ -257,8 +257,7 @@ PlanStage::StageState DeleteStage::work(WorkingSetID* out) {
return status;
}
-void DeleteStage::doRestoreState(OperationContext* opCtx) {
- _txn = opCtx;
+void DeleteStage::doRestoreState() {
const NamespaceString& ns(_collection->ns());
massert(28537,
str::stream() << "Demoted from primary while removing from " << ns.ns(),
@@ -266,6 +265,10 @@ void DeleteStage::doRestoreState(OperationContext* opCtx) {
repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(ns));
}
+void DeleteStage::doReattachToOperationContext(OperationContext* opCtx) {
+ _txn = opCtx;
+}
+
unique_ptr<PlanStageStats> DeleteStage::getStats() {
_commonStats.isEOF = isEOF();
unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(_commonStats, STAGE_DELETE);
diff --git a/src/mongo/db/exec/delete.h b/src/mongo/db/exec/delete.h
index 76f6afdb123..1b85a37474d 100644
--- a/src/mongo/db/exec/delete.h
+++ b/src/mongo/db/exec/delete.h
@@ -89,7 +89,8 @@ public:
virtual bool isEOF();
virtual StageState work(WorkingSetID* out);
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doRestoreState();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual StageType stageType() const {
return STAGE_DELETE;
diff --git a/src/mongo/db/exec/distinct_scan.cpp b/src/mongo/db/exec/distinct_scan.cpp
index d838177b456..f663bf99cee 100644
--- a/src/mongo/db/exec/distinct_scan.cpp
+++ b/src/mongo/db/exec/distinct_scan.cpp
@@ -129,19 +129,27 @@ bool DistinctScan::isEOF() {
}
void DistinctScan::doSaveState() {
- _txn = NULL;
-
// We always seek, so we don't care where the cursor is.
if (_cursor)
_cursor->saveUnpositioned();
}
-void DistinctScan::doRestoreState(OperationContext* opCtx) {
+void DistinctScan::doRestoreState() {
+ if (_cursor)
+ _cursor->restore();
+}
+
+void DistinctScan::doDetachFromOperationContext() {
+ _txn = NULL;
+ if (_cursor)
+ _cursor->detachFromOperationContext();
+}
+
+void DistinctScan::doReattachToOperationContext(OperationContext* opCtx) {
invariant(_txn == NULL);
_txn = opCtx;
-
if (_cursor)
- _cursor->restore(opCtx);
+ _cursor->reattachToOperationContext(opCtx);
}
unique_ptr<PlanStageStats> DistinctScan::getStats() {
diff --git a/src/mongo/db/exec/distinct_scan.h b/src/mongo/db/exec/distinct_scan.h
index b39ab6e9acc..36b20477a24 100644
--- a/src/mongo/db/exec/distinct_scan.h
+++ b/src/mongo/db/exec/distinct_scan.h
@@ -80,7 +80,9 @@ public:
virtual StageState work(WorkingSetID* out);
virtual bool isEOF();
virtual void doSaveState();
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doRestoreState();
+ virtual void doDetachFromOperationContext();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual StageType stageType() const {
return STAGE_DISTINCT_SCAN;
diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp
index c31d9b3f45f..85c404c17e6 100644
--- a/src/mongo/db/exec/fetch.cpp
+++ b/src/mongo/db/exec/fetch.cpp
@@ -160,16 +160,26 @@ PlanStage::StageState FetchStage::work(WorkingSetID* out) {
}
void FetchStage::doSaveState() {
- _txn = NULL;
if (_cursor)
_cursor->saveUnpositioned();
}
-void FetchStage::doRestoreState(OperationContext* opCtx) {
+void FetchStage::doRestoreState() {
+ if (_cursor)
+ _cursor->restore();
+}
+
+void FetchStage::doDetachFromOperationContext() {
+ _txn = NULL;
+ if (_cursor)
+ _cursor->detachFromOperationContext();
+}
+
+void FetchStage::doReattachToOperationContext(OperationContext* opCtx) {
invariant(_txn == NULL);
_txn = opCtx;
if (_cursor)
- _cursor->restore(opCtx);
+ _cursor->reattachToOperationContext(opCtx);
}
void FetchStage::doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
diff --git a/src/mongo/db/exec/fetch.h b/src/mongo/db/exec/fetch.h
index edfd6c35a5f..4e1bffcdeda 100644
--- a/src/mongo/db/exec/fetch.h
+++ b/src/mongo/db/exec/fetch.h
@@ -61,7 +61,9 @@ public:
virtual StageState work(WorkingSetID* out);
virtual void doSaveState();
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doRestoreState();
+ virtual void doDetachFromOperationContext();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
virtual StageType stageType() const {
diff --git a/src/mongo/db/exec/group.cpp b/src/mongo/db/exec/group.cpp
index a3049a1f787..891fc232796 100644
--- a/src/mongo/db/exec/group.cpp
+++ b/src/mongo/db/exec/group.cpp
@@ -257,7 +257,7 @@ bool GroupStage::isEOF() {
return _groupState == GroupState_Done;
}
-void GroupStage::doRestoreState(OperationContext* opCtx) {
+void GroupStage::doReattachToOperationContext(OperationContext* opCtx) {
_txn = opCtx;
}
diff --git a/src/mongo/db/exec/group.h b/src/mongo/db/exec/group.h
index 7f282fce25c..97fb39d27e7 100644
--- a/src/mongo/db/exec/group.h
+++ b/src/mongo/db/exec/group.h
@@ -90,7 +90,7 @@ public:
virtual StageState work(WorkingSetID* out);
virtual bool isEOF();
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual StageType stageType() const {
return STAGE_GROUP;
diff --git a/src/mongo/db/exec/idhack.cpp b/src/mongo/db/exec/idhack.cpp
index 2d9a67cfa4f..afc7507a54f 100644
--- a/src/mongo/db/exec/idhack.cpp
+++ b/src/mongo/db/exec/idhack.cpp
@@ -200,16 +200,26 @@ PlanStage::StageState IDHackStage::advance(WorkingSetID id,
}
void IDHackStage::doSaveState() {
- _txn = NULL;
if (_recordCursor)
_recordCursor->saveUnpositioned();
}
-void IDHackStage::doRestoreState(OperationContext* opCtx) {
+void IDHackStage::doRestoreState() {
+ if (_recordCursor)
+ _recordCursor->restore();
+}
+
+void IDHackStage::doDetachFromOperationContext() {
+ _txn = NULL;
+ if (_recordCursor)
+ _recordCursor->detachFromOperationContext();
+}
+
+void IDHackStage::doReattachToOperationContext(OperationContext* opCtx) {
invariant(_txn == NULL);
_txn = opCtx;
if (_recordCursor)
- _recordCursor->restore(opCtx);
+ _recordCursor->reattachToOperationContext(opCtx);
}
void IDHackStage::doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
diff --git a/src/mongo/db/exec/idhack.h b/src/mongo/db/exec/idhack.h
index 52446c064e2..19b12cbbc09 100644
--- a/src/mongo/db/exec/idhack.h
+++ b/src/mongo/db/exec/idhack.h
@@ -59,7 +59,9 @@ public:
virtual StageState work(WorkingSetID* out);
virtual void doSaveState();
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doRestoreState();
+ virtual void doDetachFromOperationContext();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
/**
diff --git a/src/mongo/db/exec/index_scan.cpp b/src/mongo/db/exec/index_scan.cpp
index 34f93786b6f..32dfb4c8121 100644
--- a/src/mongo/db/exec/index_scan.cpp
+++ b/src/mongo/db/exec/index_scan.cpp
@@ -234,12 +234,6 @@ bool IndexScan::isEOF() {
}
void IndexScan::doSaveState() {
- if (!_txn) {
- // We were already saved. Nothing to do.
- return;
- }
- _txn = NULL;
-
if (!_indexCursor)
return;
@@ -251,11 +245,22 @@ void IndexScan::doSaveState() {
_indexCursor->savePositioned();
}
-void IndexScan::doRestoreState(OperationContext* opCtx) {
+void IndexScan::doRestoreState() {
+ if (_indexCursor)
+ _indexCursor->restore();
+}
+
+void IndexScan::doDetachFromOperationContext() {
+ _txn = NULL;
+ if (_indexCursor)
+ _indexCursor->detachFromOperationContext();
+}
+
+void IndexScan::doReattachToOperationContext(OperationContext* opCtx) {
invariant(_txn == NULL);
_txn = opCtx;
if (_indexCursor)
- _indexCursor->restore(opCtx);
+ _indexCursor->reattachToOperationContext(opCtx);
}
void IndexScan::doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
diff --git a/src/mongo/db/exec/index_scan.h b/src/mongo/db/exec/index_scan.h
index d1a721588ac..5ae2319800c 100644
--- a/src/mongo/db/exec/index_scan.h
+++ b/src/mongo/db/exec/index_scan.h
@@ -100,7 +100,9 @@ public:
virtual StageState work(WorkingSetID* out);
virtual bool isEOF();
virtual void doSaveState();
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doRestoreState();
+ virtual void doDetachFromOperationContext();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
virtual StageType stageType() const {
diff --git a/src/mongo/db/exec/multi_iterator.cpp b/src/mongo/db/exec/multi_iterator.cpp
index f3de28c5e54..f08052e06f3 100644
--- a/src/mongo/db/exec/multi_iterator.cpp
+++ b/src/mongo/db/exec/multi_iterator.cpp
@@ -107,19 +107,31 @@ void MultiIteratorStage::kill() {
}
void MultiIteratorStage::doSaveState() {
+ for (auto&& iterator : _iterators) {
+ iterator->savePositioned();
+ }
+}
+
+void MultiIteratorStage::doRestoreState() {
+ for (auto&& iterator : _iterators) {
+ if (!iterator->restore()) {
+ kill();
+ }
+ }
+}
+
+void MultiIteratorStage::doDetachFromOperationContext() {
_txn = NULL;
- for (size_t i = 0; i < _iterators.size(); i++) {
- _iterators[i]->savePositioned();
+ for (auto&& iterator : _iterators) {
+ iterator->detachFromOperationContext();
}
}
-void MultiIteratorStage::doRestoreState(OperationContext* opCtx) {
+void MultiIteratorStage::doReattachToOperationContext(OperationContext* opCtx) {
invariant(_txn == NULL);
_txn = opCtx;
- for (size_t i = 0; i < _iterators.size(); i++) {
- if (!_iterators[i]->restore(opCtx)) {
- kill();
- }
+ for (auto&& iterator : _iterators) {
+ iterator->reattachToOperationContext(opCtx);
}
}
diff --git a/src/mongo/db/exec/multi_iterator.h b/src/mongo/db/exec/multi_iterator.h
index a0b16d57515..9bcbdb3c1c1 100644
--- a/src/mongo/db/exec/multi_iterator.h
+++ b/src/mongo/db/exec/multi_iterator.h
@@ -60,7 +60,9 @@ public:
void kill();
virtual void doSaveState();
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doRestoreState();
+ virtual void doDetachFromOperationContext();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
// Returns empty PlanStageStats object
diff --git a/src/mongo/db/exec/multi_plan.cpp b/src/mongo/db/exec/multi_plan.cpp
index de58312a4b6..18188ab6b68 100644
--- a/src/mongo/db/exec/multi_plan.cpp
+++ b/src/mongo/db/exec/multi_plan.cpp
@@ -388,7 +388,7 @@ bool MultiPlanStage::workAllPlans(size_t numResults, PlanYieldPolicy* yieldPolic
return !doneWorking;
}
-void MultiPlanStage::doRestoreState(OperationContext* opCtx) {
+void MultiPlanStage::doReattachToOperationContext(OperationContext* opCtx) {
_txn = opCtx;
}
diff --git a/src/mongo/db/exec/multi_plan.h b/src/mongo/db/exec/multi_plan.h
index 669197f648e..30b5f5f9bbf 100644
--- a/src/mongo/db/exec/multi_plan.h
+++ b/src/mongo/db/exec/multi_plan.h
@@ -66,7 +66,7 @@ public:
virtual StageState work(WorkingSetID* out);
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
diff --git a/src/mongo/db/exec/near.cpp b/src/mongo/db/exec/near.cpp
index a79ae985bd5..ceeda89c192 100644
--- a/src/mongo/db/exec/near.cpp
+++ b/src/mongo/db/exec/near.cpp
@@ -284,7 +284,7 @@ bool NearStage::isEOF() {
return SearchState_Finished == _searchState;
}
-void NearStage::doRestoreState(OperationContext* opCtx) {
+void NearStage::doReattachToOperationContext(OperationContext* opCtx) {
_txn = opCtx;
}
diff --git a/src/mongo/db/exec/near.h b/src/mongo/db/exec/near.h
index 09a743ae147..57773e42b7c 100644
--- a/src/mongo/db/exec/near.h
+++ b/src/mongo/db/exec/near.h
@@ -82,7 +82,7 @@ public:
virtual bool isEOF();
virtual StageState work(WorkingSetID* out);
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
virtual StageType stageType() const;
diff --git a/src/mongo/db/exec/oplogstart.cpp b/src/mongo/db/exec/oplogstart.cpp
index 33b41aed43d..f43272caf47 100644
--- a/src/mongo/db/exec/oplogstart.cpp
+++ b/src/mongo/db/exec/oplogstart.cpp
@@ -181,18 +181,14 @@ void OplogStart::doInvalidate(OperationContext* txn, const RecordId& dl, Invalid
}
void OplogStart::doSaveState() {
- _txn = NULL;
for (size_t i = 0; i < _subIterators.size(); i++) {
_subIterators[i]->savePositioned();
}
}
-void OplogStart::doRestoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
-
+void OplogStart::doRestoreState() {
for (size_t i = 0; i < _subIterators.size(); i++) {
- if (!_subIterators[i]->restore(opCtx)) {
+ if (!_subIterators[i]->restore()) {
_subIterators.erase(_subIterators.begin() + i);
// need to hit same i on next pass through loop
i--;
@@ -200,6 +196,21 @@ void OplogStart::doRestoreState(OperationContext* opCtx) {
}
}
+void OplogStart::doDetachFromOperationContext() {
+ _txn = NULL;
+ for (auto&& iterator : _subIterators) {
+ iterator->detachFromOperationContext();
+ }
+}
+
+void OplogStart::doReattachToOperationContext(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+ for (auto&& iterator : _subIterators) {
+ iterator->reattachToOperationContext(opCtx);
+ }
+}
+
unique_ptr<PlanStageStats> OplogStart::getStats() {
unique_ptr<PlanStageStats> ret =
make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_OPLOG_START);
diff --git a/src/mongo/db/exec/oplogstart.h b/src/mongo/db/exec/oplogstart.h
index d26d8d63b4e..14db9e9b585 100644
--- a/src/mongo/db/exec/oplogstart.h
+++ b/src/mongo/db/exec/oplogstart.h
@@ -71,7 +71,9 @@ public:
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
virtual void doSaveState();
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doRestoreState();
+ virtual void doDetachFromOperationContext();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
// Returns empty PlanStageStats object
virtual std::unique_ptr<PlanStageStats> getStats();
diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp
index 5c6fa17b251..36e2eff10b6 100644
--- a/src/mongo/db/exec/pipeline_proxy.cpp
+++ b/src/mongo/db/exec/pipeline_proxy.cpp
@@ -100,13 +100,19 @@ void PipelineProxyStage::doInvalidate(OperationContext* txn,
}
}
-void PipelineProxyStage::doSaveState() {
+void PipelineProxyStage::doDetachFromOperationContext() {
_pipeline->getContext()->opCtx = NULL;
+ if (auto child = getChildExecutor()) {
+ child->detachFromOperationContext();
+ }
}
-void PipelineProxyStage::doRestoreState(OperationContext* opCtx) {
+void PipelineProxyStage::doReattachToOperationContext(OperationContext* opCtx) {
invariant(_pipeline->getContext()->opCtx == NULL);
_pipeline->getContext()->opCtx = opCtx;
+ if (auto child = getChildExecutor()) {
+ child->reattachToOperationContext(opCtx);
+ }
}
void PipelineProxyStage::pushBack(const BSONObj& obj) {
diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h
index 517cf6ef393..84f1a867f2e 100644
--- a/src/mongo/db/exec/pipeline_proxy.h
+++ b/src/mongo/db/exec/pipeline_proxy.h
@@ -55,11 +55,10 @@ public:
virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
//
- // Manage our OperationContext. We intentionally don't propagate to the child
- // Runner as that is handled by DocumentSourceCursor as it needs to.
+ // Manage our OperationContext.
//
- virtual void doSaveState();
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doDetachFromOperationContext();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
/**
* Make obj the next object returned by getNext().
diff --git a/src/mongo/db/exec/plan_stage.cpp b/src/mongo/db/exec/plan_stage.cpp
index 5c0f9c95472..13303e94acd 100644
--- a/src/mongo/db/exec/plan_stage.cpp
+++ b/src/mongo/db/exec/plan_stage.cpp
@@ -43,13 +43,13 @@ void PlanStage::saveState() {
doSaveState();
}
-void PlanStage::restoreState(OperationContext* opCtx) {
+void PlanStage::restoreState() {
++_commonStats.unyields;
for (auto&& child : _children) {
- child->restoreState(opCtx);
+ child->restoreState();
}
- doRestoreState(opCtx);
+ doRestoreState();
}
void PlanStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
@@ -61,4 +61,20 @@ void PlanStage::invalidate(OperationContext* txn, const RecordId& dl, Invalidati
doInvalidate(txn, dl, type);
}
+void PlanStage::detachFromOperationContext() {
+ for (auto&& child : _children) {
+ child->detachFromOperationContext();
+ }
+
+ doDetachFromOperationContext();
+}
+
+void PlanStage::reattachToOperationContext(OperationContext* opCtx) {
+ for (auto&& child : _children) {
+ child->reattachToOperationContext(opCtx);
+ }
+
+ doReattachToOperationContext(opCtx);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/exec/plan_stage.h b/src/mongo/db/exec/plan_stage.h
index 85884f47f9c..cd552144def 100644
--- a/src/mongo/db/exec/plan_stage.h
+++ b/src/mongo/db/exec/plan_stage.h
@@ -221,7 +221,28 @@ public:
*
* Propagates to all children, then calls doRestoreState().
*/
- void restoreState(OperationContext* opCtx);
+ void restoreState();
+
+ /**
+ * Detaches from the OperationContext and releases any storage-engine state.
+ *
+ * It is only legal to call this when in a "saved" state. While in the "detached" state, it is
+ * only legal to call reattachToOperationContext or the destructor. It is not legal to call
+ * detachFromOperationContext() while already in the detached state.
+ *
+ * Propagates to all children, then calls doDetachFromOperationContext().
+ */
+ void detachFromOperationContext();
+
+ /**
+ * Reattaches to the OperationContext and reacquires any storage-engine state.
+ *
+ * It is only legal to call this in the "detached" state. On return, the cursor is left in a
+ * "saved" state, so callers must still call restoreState to use this object.
+ *
+ * Propagates to all children, then calls doReattachToOperationContext().
+ */
+ void reattachToOperationContext(OperationContext* opCtx);
/**
* Notifies a stage that a RecordId is going to be deleted (or in-place updated) so that the
@@ -297,11 +318,22 @@ protected:
/**
* Restores any stage-specific saved state and prepares to handle calls to work().
+ */
+ virtual void doRestoreState() {}
+
+ /**
+ * Does stage-specific detaching.
+ */
+ virtual void doDetachFromOperationContext() {}
+
+ /**
+ * Does stage-specific attaching.
*
* If the stage needs an OperationContext during its execution, it may keep a handle to the
- * provided OperationContext (which is valid until the next call to saveState()).
+ * provided OperationContext (which is valid until the next call to
+ * doDetachFromOperationContext()).
*/
- virtual void doRestoreState(OperationContext* txn) {}
+ virtual void doReattachToOperationContext(OperationContext* opCtx) {}
/**
* Does the stage-specific invalidation work.
diff --git a/src/mongo/db/exec/queued_data_stage_test.cpp b/src/mongo/db/exec/queued_data_stage_test.cpp
index 3b2e89f2577..c3aa0e88d63 100644
--- a/src/mongo/db/exec/queued_data_stage_test.cpp
+++ b/src/mongo/db/exec/queued_data_stage_test.cpp
@@ -91,7 +91,7 @@ TEST(QueuedDataStageTest, validateStats) {
ASSERT_EQUALS(stats->yields, 1U);
// unyields
- mock->restoreState(NULL);
+ mock->restoreState();
ASSERT_EQUALS(stats->unyields, 1U);
// invalidates
diff --git a/src/mongo/db/exec/subplan.cpp b/src/mongo/db/exec/subplan.cpp
index bb9b0abb14f..7c6e5584d13 100644
--- a/src/mongo/db/exec/subplan.cpp
+++ b/src/mongo/db/exec/subplan.cpp
@@ -530,7 +530,7 @@ PlanStage::StageState SubplanStage::work(WorkingSetID* out) {
return state;
}
-void SubplanStage::doRestoreState(OperationContext* opCtx) {
+void SubplanStage::doReattachToOperationContext(OperationContext* opCtx) {
_txn = opCtx;
}
diff --git a/src/mongo/db/exec/subplan.h b/src/mongo/db/exec/subplan.h
index d1a54db9d34..f85ce59b542 100644
--- a/src/mongo/db/exec/subplan.h
+++ b/src/mongo/db/exec/subplan.h
@@ -77,7 +77,7 @@ public:
virtual bool isEOF();
virtual StageState work(WorkingSetID* out);
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual StageType stageType() const {
return STAGE_SUBPLAN;
diff --git a/src/mongo/db/exec/text_or.cpp b/src/mongo/db/exec/text_or.cpp
index d1e57871593..41c57530a19 100644
--- a/src/mongo/db/exec/text_or.cpp
+++ b/src/mongo/db/exec/text_or.cpp
@@ -79,20 +79,30 @@ bool TextOrStage::isEOF() {
}
void TextOrStage::doSaveState() {
- _txn = NULL;
if (_recordCursor) {
_recordCursor->saveUnpositioned();
}
}
-void TextOrStage::doRestoreState(OperationContext* opCtx) {
- invariant(_txn == NULL);
- _txn = opCtx;
+void TextOrStage::doRestoreState() {
if (_recordCursor) {
- invariant(_recordCursor->restore(opCtx));
+ invariant(_recordCursor->restore());
}
}
+void TextOrStage::doDetachFromOperationContext() {
+ _txn = NULL;
+ if (_recordCursor)
+ _recordCursor->detachFromOperationContext();
+}
+
+void TextOrStage::doReattachToOperationContext(OperationContext* opCtx) {
+ invariant(_txn == NULL);
+ _txn = opCtx;
+ if (_recordCursor)
+ _recordCursor->reattachToOperationContext(opCtx);
+}
+
void TextOrStage::doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
// Remove the RecordID from the ScoreMap.
ScoreMap::iterator scoreIt = _scores.find(dl);
diff --git a/src/mongo/db/exec/text_or.h b/src/mongo/db/exec/text_or.h
index a51a91a3919..c2018bda166 100644
--- a/src/mongo/db/exec/text_or.h
+++ b/src/mongo/db/exec/text_or.h
@@ -87,7 +87,9 @@ public:
StageState work(WorkingSetID* out) final;
void doSaveState() final;
- void doRestoreState(OperationContext* opCtx) final;
+ void doRestoreState() final;
+ void doDetachFromOperationContext() final;
+ void doReattachToOperationContext(OperationContext* opCtx) final;
void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) final;
StageType stageType() const final {
diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp
index 970137bc6e9..a5147ed6e3b 100644
--- a/src/mongo/db/exec/update.cpp
+++ b/src/mongo/db/exec/update.cpp
@@ -902,7 +902,7 @@ PlanStage::StageState UpdateStage::work(WorkingSetID* out) {
// As restoreState may restore (recreate) cursors, make sure to restore the
// state outside of the WritUnitOfWork.
try {
- child()->restoreState(_txn);
+ child()->restoreState();
} catch (const WriteConflictException& wce) {
// Note we don't need to retry updating anything in this case since the update
// already was committed. However, we still need to return the updated document
@@ -958,12 +958,12 @@ PlanStage::StageState UpdateStage::work(WorkingSetID* out) {
return status;
}
-Status UpdateStage::restoreUpdateState(OperationContext* opCtx) {
+Status UpdateStage::restoreUpdateState() {
const UpdateRequest& request = *_params.request;
const NamespaceString& nsString(request.getNamespaceString());
// We may have stepped down during the yield.
- bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() &&
+ bool userInitiatedWritesAndNotPrimary = _txn->writesAreReplicated() &&
!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString);
if (userInitiatedWritesAndNotPrimary) {
@@ -982,17 +982,19 @@ Status UpdateStage::restoreUpdateState(OperationContext* opCtx) {
17270);
}
- _params.driver->refreshIndexKeys(lifecycle->getIndexKeys(opCtx));
+ _params.driver->refreshIndexKeys(lifecycle->getIndexKeys(_txn));
}
return Status::OK();
}
-void UpdateStage::doRestoreState(OperationContext* opCtx) {
- _txn = opCtx;
- uassertStatusOK(restoreUpdateState(opCtx));
+void UpdateStage::doRestoreState() {
+ uassertStatusOK(restoreUpdateState());
}
+void UpdateStage::doReattachToOperationContext(OperationContext* opCtx) {
+ _txn = opCtx;
+}
unique_ptr<PlanStageStats> UpdateStage::getStats() {
_commonStats.isEOF = isEOF();
diff --git a/src/mongo/db/exec/update.h b/src/mongo/db/exec/update.h
index 4d59dca50f2..c728bda5392 100644
--- a/src/mongo/db/exec/update.h
+++ b/src/mongo/db/exec/update.h
@@ -84,7 +84,8 @@ public:
virtual bool isEOF();
virtual StageState work(WorkingSetID* out);
- virtual void doRestoreState(OperationContext* opCtx);
+ virtual void doRestoreState();
+ virtual void doReattachToOperationContext(OperationContext* opCtx);
virtual StageType stageType() const {
return STAGE_UPDATE;
@@ -163,7 +164,7 @@ private:
/**
* Helper for restoring the state of this update.
*/
- Status restoreUpdateState(OperationContext* opCtx);
+ Status restoreUpdateState();
// Transactional context. Not owned by us.
OperationContext* _txn;
diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp
index bd05e7712d8..f2c0166876f 100644
--- a/src/mongo/db/operation_context_impl.cpp
+++ b/src/mongo/db/operation_context_impl.cpp
@@ -99,8 +99,6 @@ RecoveryUnit* OperationContextImpl::recoveryUnit() const {
}
RecoveryUnit* OperationContextImpl::releaseRecoveryUnit() {
- if (_recovery.get())
- _recovery->beingReleasedFromOperationContext();
return _recovery.release();
}
@@ -109,8 +107,6 @@ OperationContext::RecoveryUnitState OperationContextImpl::setRecoveryUnit(Recove
_recovery.reset(unit);
RecoveryUnitState oldState = _ruState;
_ruState = state;
- if (unit)
- unit->beingSetOnOperationContext();
return oldState;
}
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 702852f53b2..12a1725f7fd 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -88,7 +88,7 @@ void DocumentSourceCursor::loadBatch() {
const NamespaceString nss(_ns);
AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss);
- _exec->restoreState(pExpCtx->opCtx);
+ _exec->restoreState();
int memUsageBytes = 0;
BSONObj obj;
@@ -172,7 +172,7 @@ Value DocumentSourceCursor::serialize(bool explain) const {
massert(17392, "No _exec. Were we disposed before explained?", _exec);
- _exec->restoreState(pExpCtx->opCtx);
+ _exec->restoreState();
Explain::explainStages(_exec.get(), ExplainCommon::QUERY_PLANNER, &explainBuilder);
_exec->saveState();
}
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 30c270eced1..5a65d218b2c 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -73,36 +73,6 @@ const int32_t MaxBytesToReturnToClientAtOnce = 4 * 1024 * 1024;
// Failpoint for checking whether we've received a getmore.
MONGO_FP_DECLARE(failReceivedGetmore);
-ScopedRecoveryUnitSwapper::ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn)
- : _cc(cc), _txn(txn), _dismissed(false) {
- // Save this for later. We restore it upon destruction.
- _txn->recoveryUnit()->abandonSnapshot();
- _txnPreviousRecoveryUnit.reset(txn->releaseRecoveryUnit());
-
- // Transfer ownership of the RecoveryUnit from the ClientCursor to the OpCtx.
- RecoveryUnit* ccRecoveryUnit = cc->releaseOwnedRecoveryUnit();
- _txnPreviousRecoveryUnitState =
- txn->setRecoveryUnit(ccRecoveryUnit, OperationContext::kNotInUnitOfWork);
-}
-
-void ScopedRecoveryUnitSwapper::dismiss() {
- _dismissed = true;
-}
-
-ScopedRecoveryUnitSwapper::~ScopedRecoveryUnitSwapper() {
- _txn->recoveryUnit()->abandonSnapshot();
-
- if (_dismissed) {
- // Just clean up the recovery unit which we originally got from the ClientCursor.
- delete _txn->releaseRecoveryUnit();
- } else {
- // Swap the RU back into the ClientCursor for subsequent getMores.
- _cc->setOwnedRecoveryUnit(_txn->releaseRecoveryUnit());
- }
-
- _txn->setRecoveryUnit(_txnPreviousRecoveryUnit.release(), _txnPreviousRecoveryUnitState);
-}
-
/**
* If ntoreturn is zero, we stop generating additional results as soon as we have either 101
* documents or at least 1MB of data. On subsequent getmores, there is no limit on the number
@@ -359,16 +329,6 @@ QueryResult::View getMore(OperationContext* txn,
// CC, so don't delete it.
ClientCursorPin ccPin(cursorManager, cursorid);
ClientCursor* cc = ccPin.c();
-
- // If we're not being called from DBDirectClient we want to associate the RecoveryUnit
- // used to create the execution machinery inside the cursor with our OperationContext.
- // If we throw or otherwise exit this method in a disorderly fashion, we must ensure
- // that further calls to getMore won't fail, and that the provided OperationContext
- // has a valid RecoveryUnit. As such, we use RAII to accomplish this.
- //
- // This must be destroyed before the ClientCursor is destroyed.
- unique_ptr<ScopedRecoveryUnitSwapper> ruSwapper;
-
// These are set in the QueryResult msg we return.
int resultFlags = ResultFlag_AwaitCapable;
@@ -392,19 +352,8 @@ QueryResult::View getMore(OperationContext* txn,
ns == cc->ns());
*isCursorAuthorized = true;
- // Restore the RecoveryUnit if we need to.
- if (txn->getClient()->isInDirectClient()) {
- if (cc->hasRecoveryUnit())
- invariant(txn->recoveryUnit() == cc->getUnownedRecoveryUnit());
- } else {
- if (!cc->hasRecoveryUnit()) {
- // Start using a new RecoveryUnit
- cc->setOwnedRecoveryUnit(
- getGlobalServiceContext()->getGlobalStorageEngine()->newRecoveryUnit());
- }
- // Swap RecoveryUnit(s) between the ClientCursor and OperationContext.
- ruSwapper = make_unique<ScopedRecoveryUnitSwapper>(cc, txn);
- }
+ if (cc->isReadCommitted())
+ uassertStatusOK(txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot());
// Reset timeout timer on the cursor since the cursor is still in use.
cc->setIdleTime(0);
@@ -436,7 +385,8 @@ QueryResult::View getMore(OperationContext* txn,
startingResult = cc->pos();
PlanExecutor* exec = cc->getExecutor();
- exec->restoreState(txn);
+ exec->reattachToOperationContext(txn);
+ exec->restoreState();
PlanExecutor::ExecState state;
@@ -463,7 +413,7 @@ QueryResult::View getMore(OperationContext* txn,
// Reacquiring locks.
ctx = make_unique<AutoGetCollectionForRead>(txn, nss);
- exec->restoreState(txn);
+ exec->restoreState();
// We woke up because either the timed_wait expired, or there was more data. Either
// way, attempt to generate another batch of results.
@@ -485,7 +435,6 @@ QueryResult::View getMore(OperationContext* txn,
// pin. Because our ClientCursorPin is declared after our lock is declared, this
// will happen under the lock.
if (!shouldSaveCursorGetMore(state, exec, isCursorTailable(cc))) {
- ruSwapper.reset();
ccPin.deleteUnderlying();
// cc is now invalid, as is the executor
@@ -499,16 +448,10 @@ QueryResult::View getMore(OperationContext* txn,
// Continue caching the ClientCursor.
cc->incPos(numResults);
exec->saveState();
+ exec->detachFromOperationContext();
LOG(5) << "getMore saving client cursor ended with state "
<< PlanExecutor::statestr(state) << endl;
- if (PlanExecutor::IS_EOF == state && isCursorTailable(cc)) {
- if (!txn->getClient()->isInDirectClient()) {
- // Don't stash the RU. Get a new one on the next getMore.
- ruSwapper->dismiss();
- }
- }
-
// Possibly note slave's position in the oplog.
if ((cc->queryOptions() & QueryOption_OplogReplay) && !slaveReadTill.isNull()) {
cc->slaveReadTill(slaveReadTill);
@@ -698,32 +641,19 @@ std::string runQuery(OperationContext* txn,
if (shouldSaveCursor(txn, collection, state, exec.get())) {
// We won't use the executor until it's getMore'd.
exec->saveState();
+ exec->detachFromOperationContext();
// Allocate a new ClientCursor. We don't have to worry about leaking it as it's
// inserted into a global map by its ctor.
- ClientCursor* cc = new ClientCursor(collection->getCursorManager(),
- exec.release(),
- nss.ns(),
- pq.getOptions(),
- pq.getFilter());
+ ClientCursor* cc =
+ new ClientCursor(collection->getCursorManager(),
+ exec.release(),
+ nss.ns(),
+ txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
+ pq.getOptions(),
+ pq.getFilter());
ccId = cc->cursorid();
- if (txn->getClient()->isInDirectClient()) {
- cc->setUnownedRecoveryUnit(txn->recoveryUnit());
- } else if (state == PlanExecutor::IS_EOF && pq.isTailable()) {
- // Don't stash the RU for tailable cursors at EOF, let them get a new RU on their
- // next getMore.
- } else {
- // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent
- // getMore requests. The calling OpCtx gets a fresh RecoveryUnit.
- txn->recoveryUnit()->abandonSnapshot();
- cc->setOwnedRecoveryUnit(txn->releaseRecoveryUnit());
- StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
- invariant(txn->setRecoveryUnit(storageEngine->newRecoveryUnit(),
- OperationContext::kNotInUnitOfWork) ==
- OperationContext::kNotInUnitOfWork);
- }
-
LOG(5) << "caching executor with cursorid " << ccId << " after returning " << numResults
<< " results" << endl;
diff --git a/src/mongo/db/query/find.h b/src/mongo/db/query/find.h
index 04675f44d7b..8afaf16b334 100644
--- a/src/mongo/db/query/find.h
+++ b/src/mongo/db/query/find.h
@@ -41,27 +41,6 @@ namespace mongo {
class NamespaceString;
class OperationContext;
-class ScopedRecoveryUnitSwapper {
-public:
- ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn);
-
- ~ScopedRecoveryUnitSwapper();
-
- /**
- * Dismissing the RU swapper causes it to simply free the recovery unit rather than swapping
- * it back into the ClientCursor.
- */
- void dismiss();
-
-private:
- ClientCursor* _cc;
- OperationContext* _txn;
- bool _dismissed;
-
- std::unique_ptr<RecoveryUnit> _txnPreviousRecoveryUnit;
- OperationContext::RecoveryUnitState _txnPreviousRecoveryUnitState;
-};
-
/**
* Returns true if enough results have been prepared to stop adding more to the first batch.
*
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index dfc97724b6f..f163e0637b1 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -178,6 +178,7 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx,
}
Status PlanExecutor::pickBestPlan(YieldPolicy policy) {
+ invariant(_currentState == kUsable);
// For YIELD_AUTO, this will both set an auto yield policy on the PlanExecutor and
// register it to receive notifications.
this->setYieldPolicy(policy);
@@ -251,6 +252,7 @@ OperationContext* PlanExecutor::getOpCtx() const {
}
void PlanExecutor::saveState() {
+ invariant(_currentState == kUsable || _currentState == kSaved);
if (!killed()) {
_root->saveState();
}
@@ -264,12 +266,12 @@ void PlanExecutor::saveState() {
WorkingSetCommon::prepareForSnapshotChange(_workingSet.get());
}
- _opCtx = NULL;
+ _currentState = kSaved;
}
-bool PlanExecutor::restoreState(OperationContext* opCtx) {
+bool PlanExecutor::restoreState() {
try {
- return restoreStateWithoutRetrying(opCtx);
+ return restoreStateWithoutRetrying();
} catch (const WriteConflictException& wce) {
if (!_yieldPolicy->allowedToYield())
throw;
@@ -279,23 +281,35 @@ bool PlanExecutor::restoreState(OperationContext* opCtx) {
}
}
-bool PlanExecutor::restoreStateWithoutRetrying(OperationContext* opCtx) {
- invariant(NULL == _opCtx);
- invariant(opCtx);
-
- _opCtx = opCtx;
-
+bool PlanExecutor::restoreStateWithoutRetrying() {
+ invariant(_currentState == kSaved);
// We're restoring after a yield or getMore now. If we're a yielding plan executor, reset
// the yield timer in order to prevent from yielding again right away.
_yieldPolicy->resetTimer();
if (!killed()) {
- _root->restoreState(opCtx);
+ _root->restoreState();
}
+ _currentState = kUsable;
return !killed();
}
+void PlanExecutor::detachFromOperationContext() {
+ invariant(_currentState == kSaved);
+ _opCtx = nullptr;
+ _root->detachFromOperationContext();
+ _currentState = kDetached;
+ _everDetachedFromOperationContext = true;
+}
+
+void PlanExecutor::reattachToOperationContext(OperationContext* txn) {
+ invariant(_currentState == kDetached);
+ _opCtx = txn;
+ _root->reattachToOperationContext(txn);
+ _currentState = kSaved;
+}
+
void PlanExecutor::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) {
if (!killed()) {
_root->invalidate(txn, dl, type);
@@ -304,7 +318,7 @@ void PlanExecutor::invalidate(OperationContext* txn, const RecordId& dl, Invalid
PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) {
Snapshotted<BSONObj> snapshotted;
- ExecState state = getNextSnapshotted(objOut ? &snapshotted : NULL, dlOut);
+ ExecState state = getNextImpl(objOut ? &snapshotted : NULL, dlOut);
if (objOut) {
*objOut = snapshotted.value();
@@ -315,6 +329,13 @@ PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut)
PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* objOut,
RecordId* dlOut) {
+ // Detaching from the OperationContext means that the returned snapshot ids could be invalid.
+ invariant(!_everDetachedFromOperationContext);
+ return getNextImpl(objOut, dlOut);
+}
+
+PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) {
+ invariant(_currentState == kUsable);
if (killed()) {
if (NULL != objOut) {
Status status(ErrorCodes::OperationFailed,
@@ -453,6 +474,7 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o
}
bool PlanExecutor::isEOF() {
+ invariant(_currentState == kUsable);
return killed() || (_stash.empty() && _root->isEOF());
}
@@ -493,6 +515,7 @@ void PlanExecutor::kill(string reason) {
}
Status PlanExecutor::executePlan() {
+ invariant(_currentState == kUsable);
BSONObj obj;
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
while (PlanExecutor::ADVANCED == state) {
diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h
index 83e0c7eb6cd..ed21f8b047a 100644
--- a/src/mongo/db/query/plan_executor.h
+++ b/src/mongo/db/query/plan_executor.h
@@ -228,9 +228,10 @@ public:
//
/**
- * Save any state required to either
- * 1. hibernate waiting for a getMore, or
- * 2. yield the lock (on applicable storage engines) to allow writes to proceed.
+ * Save any state required to recover from changes to the underlying collection's data.
+ *
+ * While in the "saved" state, it is only legal to call restoreState,
+ * detachFromOperationContext, or the destructor.
*/
void saveState();
@@ -244,7 +245,24 @@ public:
*
* Returns false otherwise. The execution tree cannot be worked and should be deleted.
*/
- bool restoreState(OperationContext* opCtx);
+ bool restoreState();
+
+ /**
+ * Detaches from the OperationContext and releases any storage-engine state.
+ *
+ * It is only legal to call this when in a "saved" state. While in the "detached" state, it is
+ * only legal to call reattachToOperationContext or the destructor. It is not legal to call
+ * detachFromOperationContext() while already in the detached state.
+ */
+ void detachFromOperationContext();
+
+ /**
+ * Reattaches to the OperationContext and reacquires any storage-engine state.
+ *
+ * It is only legal to call this in the "detached" state. On return, the cursor is left in a
+ * "saved" state, so callers must still call restoreState to use this object.
+ */
+ void reattachToOperationContext(OperationContext* opCtx);
/**
* Same as restoreState but without the logic to retry if a WriteConflictException is
@@ -252,7 +270,7 @@ public:
*
* This is only public for PlanYieldPolicy. DO NOT CALL ANYWHERE ELSE.
*/
- bool restoreStateWithoutRetrying(OperationContext* opCtx);
+ bool restoreStateWithoutRetrying();
//
// Running Support
@@ -351,6 +369,8 @@ public:
void enqueue(const BSONObj& obj);
private:
+ ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut);
+
/**
* RAII approach to ensuring that plan executors are deregistered.
*
@@ -441,6 +461,10 @@ private:
// to consume yet. We empty the queue before retrieving further results from the plan
// stages.
std::queue<BSONObj> _stash;
+
+ enum { kUsable, kSaved, kDetached } _currentState = kUsable;
+
+ bool _everDetachedFromOperationContext = false;
};
} // namespace mongo
diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp
index 45d996893a9..da41e863db1 100644
--- a/src/mongo/db/query/plan_yield_policy.cpp
+++ b/src/mongo/db/query/plan_yield_policy.cpp
@@ -97,7 +97,7 @@ bool PlanYieldPolicy::yield(RecordFetcher* fetcher) {
QueryYield::yieldAllLocks(opCtx, fetcher);
}
- return _planYielding->restoreStateWithoutRetrying(opCtx);
+ return _planYielding->restoreStateWithoutRetrying();
} catch (const WriteConflictException& wce) {
CurOp::get(opCtx)->debug().writeConflicts++;
WriteConflictException::logAndBackoff(
diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp
index a3de8953291..392c81a080c 100644
--- a/src/mongo/db/repair_database.cpp
+++ b/src/mongo/db/repair_database.cpp
@@ -143,7 +143,7 @@ Status rebuildIndexesOnCollection(OperationContext* txn,
rs->deleteRecord(txn, id);
wunit.commit();
}
- cursor->restore(txn);
+ cursor->restore();
continue;
}
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
index 25ebf6a5de6..25a61a9edb2 100644
--- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
+++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
@@ -47,9 +47,11 @@ public:
return {};
}
void savePositioned() final {}
- bool restore(OperationContext* txn) final {
+ bool restore() final {
return true;
}
+ void detachFromOperationContext() final {}
+ void reattachToOperationContext(OperationContext* txn) final {}
};
class DevNullRecordStore : public RecordStore {
diff --git a/src/mongo/db/storage/in_memory/in_memory_btree_impl.cpp b/src/mongo/db/storage/in_memory/in_memory_btree_impl.cpp
index f40dff8e7ff..30cabb59716 100644
--- a/src/mongo/db/storage/in_memory/in_memory_btree_impl.cpp
+++ b/src/mongo/db/storage/in_memory/in_memory_btree_impl.cpp
@@ -299,14 +299,11 @@ public:
}
void saveUnpositioned() override {
- _txn = nullptr;
_savedAtEnd = true;
// Doing nothing with end cursor since it will do full reseek on restore.
}
- void restore(OperationContext* txn) override {
- _txn = txn;
-
+ void restore() override {
// Always do a full seek on restore. We cannot use our last position since index
// entries may have been inserted closer to our endpoint and we would need to move
// over them.
@@ -324,6 +321,14 @@ public:
|| _data.value_comp().compare(*_it, {_savedKey, _savedLoc}) != 0;
}
+ void detachFromOperationContext() final {
+ _txn = nullptr;
+ }
+
+ void reattachToOperationContext(OperationContext* txn) final {
+ _txn = txn;
+ }
+
private:
bool atEndPoint() const {
return _endState && _it == _endState->it;
diff --git a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp
index af596f7a569..7c3b708d513 100644
--- a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp
+++ b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp
@@ -112,7 +112,7 @@ private:
class InMemoryRecordStore::Cursor final : public RecordCursor {
public:
Cursor(OperationContext* txn, const InMemoryRecordStore& rs)
- : _txn(txn), _records(rs._data->records), _isCapped(rs.isCapped()) {}
+ : _records(rs._data->records), _isCapped(rs.isCapped()) {}
boost::optional<Record> next() final {
if (_needFirstSeek) {
@@ -138,18 +138,15 @@ public:
}
void savePositioned() final {
- _txn = nullptr;
if (!_needFirstSeek && !_lastMoveWasRestore)
_savedId = _it == _records.end() ? RecordId() : _it->first;
}
void saveUnpositioned() final {
- _txn = nullptr;
_savedId = RecordId();
}
- bool restore(OperationContext* txn) final {
- _txn = txn;
+ bool restore() final {
if (_savedId.isNull()) {
_it = _records.end();
return true;
@@ -162,8 +159,10 @@ public:
return !(_isCapped && _lastMoveWasRestore);
}
+ void detachFromOperationContext() final {}
+ void reattachToOperationContext(OperationContext* txn) final {}
+
private:
- unowned_ptr<OperationContext> _txn;
Records::const_iterator _it;
bool _needFirstSeek = true;
bool _lastMoveWasRestore = false;
@@ -176,7 +175,7 @@ private:
class InMemoryRecordStore::ReverseCursor final : public RecordCursor {
public:
ReverseCursor(OperationContext* txn, const InMemoryRecordStore& rs)
- : _txn(txn), _records(rs._data->records), _isCapped(rs.isCapped()) {}
+ : _records(rs._data->records), _isCapped(rs.isCapped()) {}
boost::optional<Record> next() final {
if (_needFirstSeek) {
@@ -212,18 +211,15 @@ public:
}
void savePositioned() final {
- _txn = nullptr;
if (!_needFirstSeek && !_lastMoveWasRestore)
_savedId = _it == _records.rend() ? RecordId() : _it->first;
}
void saveUnpositioned() final {
- _txn = nullptr;
_savedId = RecordId();
}
- bool restore(OperationContext* txn) final {
- _txn = txn;
+ bool restore() final {
if (_savedId.isNull()) {
_it = _records.rend();
return true;
@@ -239,8 +235,10 @@ public:
return !(_isCapped && _lastMoveWasRestore);
}
+ void detachFromOperationContext() final {}
+ void reattachToOperationContext(OperationContext* txn) final {}
+
private:
- unowned_ptr<OperationContext> _txn;
Records::const_reverse_iterator _it;
bool _needFirstSeek = true;
bool _lastMoveWasRestore = false;
diff --git a/src/mongo/db/storage/mmap_v1/btree/btree_interface.cpp b/src/mongo/db/storage/mmap_v1/btree/btree_interface.cpp
index ce1aa117fef..1154b296dcf 100644
--- a/src/mongo/db/storage/mmap_v1/btree/btree_interface.cpp
+++ b/src/mongo/db/storage/mmap_v1/btree/btree_interface.cpp
@@ -197,8 +197,6 @@ public:
}
void savePositioned() override {
- _txn = nullptr;
-
if (!_lastMoveWasRestore)
_savedEOF = isEOF();
@@ -215,7 +213,6 @@ public:
}
void saveUnpositioned() override {
- _txn = nullptr;
// Don't leak our registration if savePositioned() was previously called.
if (!_saved.bucket.isNull())
_btree->savedCursors()->unregisterCursor(&_saved);
@@ -224,11 +221,7 @@ public:
_savedEOF = true;
}
- void restore(OperationContext* txn) override {
- // guard against accidental double restore
- invariant(!_txn);
- _txn = txn;
-
+ void restore() override {
// Always do a full seek on restore. We cannot use our last position since index
// entries may have been inserted closer to our endpoint and we would need to move
// over them.
@@ -251,6 +244,14 @@ public:
|| getDiskLoc() != _saved.loc || compareKeys(getKey(), _saved.key) != 0;
}
+ void detachFromOperationContext() final {
+ _txn = nullptr;
+ }
+
+ void reattachToOperationContext(OperationContext* txn) final {
+ _txn = txn;
+ }
+
private:
bool isEOF() const {
return _bucket.isNull();
diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h
index 5c0437cce56..7ba485d2a8b 100644
--- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h
+++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h
@@ -336,9 +336,15 @@ public:
boost::optional<Record> seekExact(const RecordId& id) final;
void invalidate(const RecordId& dl) final;
void savePositioned() final {}
- bool restore(OperationContext* txn) final {
+ bool restore() final {
return true;
}
+ void detachFromOperationContext() final {
+ _txn = nullptr;
+ }
+ void reattachToOperationContext(OperationContext* txn) final {
+ _txn = txn;
+ }
std::unique_ptr<RecordFetcher> fetcherForNext() const final;
private:
diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp
index 353a7f39c0c..e1487ee20fe 100644
--- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp
+++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp
@@ -99,12 +99,9 @@ void CappedRecordStoreV1Iterator::invalidate(const RecordId& id) {
}
}
-void CappedRecordStoreV1Iterator::savePositioned() {
- _txn = nullptr;
-}
+void CappedRecordStoreV1Iterator::savePositioned() {}
-bool CappedRecordStoreV1Iterator::restore(OperationContext* txn) {
- _txn = txn;
+bool CappedRecordStoreV1Iterator::restore() {
return !_killedByInvalidate;
}
diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h
index 0a366d9921a..3793c5cce4b 100644
--- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h
+++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h
@@ -50,7 +50,13 @@ public:
boost::optional<Record> next() final;
boost::optional<Record> seekExact(const RecordId& id) final;
void savePositioned() final;
- bool restore(OperationContext* txn) final;
+ bool restore() final;
+ void detachFromOperationContext() final {
+ _txn = nullptr;
+ }
+ void reattachToOperationContext(OperationContext* txn) final {
+ _txn = txn;
+ }
void invalidate(const RecordId& dl) final;
std::unique_ptr<RecordFetcher> fetcherForNext() const final;
std::unique_ptr<RecordFetcher> fetcherForId(const RecordId& id) const final;
diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h b/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h
index def5178ad8e..eb2dc6cb8e1 100644
--- a/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h
+++ b/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h
@@ -47,12 +47,15 @@ public:
boost::optional<Record> next() final;
boost::optional<Record> seekExact(const RecordId& id) final;
void invalidate(const RecordId& dl);
- void savePositioned() final {
+ void savePositioned() final {}
+ bool restore() final {
+ return true;
+ }
+ void detachFromOperationContext() final {
_txn = nullptr;
}
- bool restore(OperationContext* txn) final {
+ void reattachToOperationContext(OperationContext* txn) final {
_txn = txn;
- return true;
}
// Explicitly not supporting fetcherForNext(). The expected use case for this class is a
diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp
index babfbcf26ea..5d668004b68 100644
--- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp
+++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp
@@ -108,12 +108,9 @@ void SimpleRecordStoreV1Iterator::invalidate(const RecordId& dl) {
}
}
-void SimpleRecordStoreV1Iterator::savePositioned() {
- _txn = nullptr;
-}
+void SimpleRecordStoreV1Iterator::savePositioned() {}
-bool SimpleRecordStoreV1Iterator::restore(OperationContext* txn) {
- _txn = txn;
+bool SimpleRecordStoreV1Iterator::restore() {
// if the collection is dropped, then the cursor should be destroyed
return true;
}
diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h
index 91b0088bf72..4b74864bf1c 100644
--- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h
+++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h
@@ -50,7 +50,13 @@ public:
boost::optional<Record> next() final;
boost::optional<Record> seekExact(const RecordId& id) final;
void savePositioned() final;
- bool restore(OperationContext* txn) final;
+ bool restore() final;
+ void detachFromOperationContext() final {
+ _txn = nullptr;
+ }
+ void reattachToOperationContext(OperationContext* txn) final {
+ _txn = txn;
+ }
void invalidate(const RecordId& dl) final;
std::unique_ptr<RecordFetcher> fetcherForNext() const final;
std::unique_ptr<RecordFetcher> fetcherForId(const RecordId& id) const final;
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index c3226fa41e1..6d4fd9dae9f 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -196,7 +196,24 @@ public:
*
* This handles restoring after either savePositioned() or saveUnpositioned().
*/
- virtual bool restore(OperationContext* txn) = 0;
+ virtual bool restore() = 0;
+
+ /**
+ * Detaches from the OperationContext and releases any storage-engine state.
+ *
+ * It is only legal to call this when in a "saved" state. While in the "detached" state, it is
+ * only legal to call reattachToOperationContext or the destructor. It is not legal to call
+ * detachFromOperationContext() while already in the detached state.
+ */
+ virtual void detachFromOperationContext() = 0;
+
+ /**
+ * Reattaches to the OperationContext and reacquires any storage-engine state.
+ *
+ * It is only legal to call this in the "detached" state. On return, the cursor is left in a
+ * "saved" state, so callers must still call restoreState to use this object.
+ */
+ virtual void reattachToOperationContext(OperationContext* opCtx) = 0;
/**
* Inform the cursor that this id is being invalidated.
diff --git a/src/mongo/db/storage/record_store_test_recorditer.cpp b/src/mongo/db/storage/record_store_test_recorditer.cpp
index 3be72344c85..7d42c6bfe2a 100644
--- a/src/mongo/db/storage/record_store_test_recorditer.cpp
+++ b/src/mongo/db/storage/record_store_test_recorditer.cpp
@@ -316,7 +316,7 @@ TEST(RecordStoreTestHarness, RecordIteratorEOF) {
ASSERT_OK(res.getStatus());
uow.commit();
- ASSERT(cursor->restore(opCtx.get()));
+ ASSERT(cursor->restore());
// Iterator should still be EOF.
ASSERT(!cursor->next());
@@ -369,7 +369,7 @@ TEST(RecordStoreTestHarness, RecordIteratorSavePositionedRestore) {
for (int i = 0; i < nToInsert; i++) {
cursor->savePositioned();
cursor->savePositioned(); // It is legal to save twice in a row.
- cursor->restore(opCtx.get());
+ cursor->restore();
const auto record = cursor->next();
ASSERT(record);
@@ -379,7 +379,7 @@ TEST(RecordStoreTestHarness, RecordIteratorSavePositionedRestore) {
cursor->savePositioned();
cursor->savePositioned(); // It is legal to save twice in a row.
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT(!cursor->next());
}
diff --git a/src/mongo/db/storage/record_store_test_repairiter.cpp b/src/mongo/db/storage/record_store_test_repairiter.cpp
index 56abdcb6b14..4e21eccc2e1 100644
--- a/src/mongo/db/storage/record_store_test_repairiter.cpp
+++ b/src/mongo/db/storage/record_store_test_repairiter.cpp
@@ -158,7 +158,7 @@ TEST(RecordStoreTestHarness, GetIteratorForRepairInvalidateSingleton) {
// Invalidate the record we're pointing at.
cursor->savePositioned();
cursor->invalidate(idToInvalidate);
- cursor->restore(opCtx.get());
+ cursor->restore();
// Iterator should be EOF now because the only thing in the collection got deleted.
ASSERT(!cursor->next());
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index 3bb9d3093ea..d50da13cb95 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -53,9 +53,6 @@ public:
virtual void reportState(BSONObjBuilder* b) const {}
- virtual void beingReleasedFromOperationContext() {}
- virtual void beingSetOnOperationContext() {}
-
/**
* These should be called through WriteUnitOfWork rather than directly.
*
@@ -109,6 +106,13 @@ public:
"Current storage engine does not support $readMajorityTemporaryName"};
}
+ /**
+ * Returns true if setReadFromMajorityCommittedSnapshot() has been called.
+ */
+ virtual bool isReadingFromMajorityCommittedSnapshot() {
+ return false;
+ }
+
virtual SnapshotId getSnapshotId() const = 0;
/**
diff --git a/src/mongo/db/storage/sorted_data_interface.h b/src/mongo/db/storage/sorted_data_interface.h
index 2836c7c4814..e373fa1ff44 100644
--- a/src/mongo/db/storage/sorted_data_interface.h
+++ b/src/mongo/db/storage/sorted_data_interface.h
@@ -327,7 +327,24 @@ public:
*
* This handles restoring after either savePositioned() or saveUnpositioned().
*/
- virtual void restore(OperationContext* txn) = 0;
+ virtual void restore() = 0;
+
+ /**
+ * Detaches from the OperationContext and releases any storage-engine state.
+ *
+ * It is only legal to call this when in a "saved" state. While in the "detached" state, it
+ * is only legal to call reattachToOperationContext or the destructor. It is not legal to
+ * call detachFromOperationContext() while already in the detached state.
+ */
+ virtual void detachFromOperationContext() = 0;
+
+ /**
+ * Reattaches to the OperationContext and reacquires any storage-engine state.
+ *
+ * It is only legal to call this in the "detached" state. On return, the cursor is left in a
+ * "saved" state, so callers must still call restoreState to use this object.
+ */
+ virtual void reattachToOperationContext(OperationContext* opCtx) = 0;
};
/**
diff --git a/src/mongo/db/storage/sorted_data_interface_test_cursor_end_position.cpp b/src/mongo/db/storage/sorted_data_interface_test_cursor_end_position.cpp
index 190f707c4c0..e0d31fa90d1 100644
--- a/src/mongo/db/storage/sorted_data_interface_test_cursor_end_position.cpp
+++ b/src/mongo/db/storage/sorted_data_interface_test_cursor_end_position.cpp
@@ -144,7 +144,7 @@ void testSetEndPosition_Seek_Forward(bool unique, bool inclusive) {
cursor->saveUnpositioned();
removeFromIndex(opCtx, sorted, {{key3, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->seek(key2, true), boost::none);
ASSERT_EQ(cursor->seek(key3, true), boost::none);
@@ -192,7 +192,7 @@ void testSetEndPosition_Seek_Reverse(bool unique, bool inclusive) {
cursor->saveUnpositioned();
removeFromIndex(opCtx, sorted, {{key2, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->seek(key3, true), boost::none);
ASSERT_EQ(cursor->seek(key2, true), boost::none);
@@ -226,7 +226,7 @@ void testSetEndPosition_Restore_Forward(bool unique) {
ASSERT_EQ(cursor->seek(key1, true), IndexKeyEntry(key1, loc1));
cursor->savePositioned();
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1));
@@ -236,7 +236,7 @@ void testSetEndPosition_Restore_Forward(bool unique) {
{
{key2, loc1}, {key3, loc1},
});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->next(), boost::none);
}
@@ -262,7 +262,7 @@ void testSetEndPosition_Restore_Reverse(bool unique) {
ASSERT_EQ(cursor->seek(key4, true), IndexKeyEntry(key4, loc1));
cursor->savePositioned();
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->next(), IndexKeyEntry(key3, loc1));
@@ -272,7 +272,7 @@ void testSetEndPosition_Restore_Reverse(bool unique) {
{
{key2, loc1}, {key3, loc1},
});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->next(), boost::none);
}
@@ -309,7 +309,7 @@ void testSetEndPosition_RestoreEndCursor_Forward(bool unique) {
{key2, loc1}, // in range
{key3, loc1}, // out of range
});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->seek(key1, true), IndexKeyEntry(key1, loc1));
ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1));
@@ -342,7 +342,7 @@ void testSetEndPosition_RestoreEndCursor_Reverse(bool unique) {
{key2, loc1}, // in range
{key3, loc1}, // out of range
});
- cursor->restore(opCtx.get()); // must restore end cursor even with saveUnpositioned().
+ cursor->restore(); // must restore end cursor even with saveUnpositioned().
ASSERT_EQ(cursor->seek(key4, true), IndexKeyEntry(key4, loc1));
ASSERT_EQ(cursor->next(), IndexKeyEntry(key3, loc1));
diff --git a/src/mongo/db/storage/sorted_data_interface_test_cursor_saverestore.cpp b/src/mongo/db/storage/sorted_data_interface_test_cursor_saverestore.cpp
index 679bb3f8c8b..c63e6ea15fe 100644
--- a/src/mongo/db/storage/sorted_data_interface_test_cursor_saverestore.cpp
+++ b/src/mongo/db/storage/sorted_data_interface_test_cursor_saverestore.cpp
@@ -75,7 +75,7 @@ TEST(SortedDataInterface, SaveAndRestorePositionWhileIterateCursor) {
ASSERT_EQ(entry, IndexKeyEntry(BSON("" << i), RecordId(42, i * 2)));
cursor->savePositioned();
- cursor->restore(opCtx.get());
+ cursor->restore();
}
ASSERT(!cursor->next());
ASSERT_EQ(i, nToInsert);
@@ -121,7 +121,7 @@ TEST(SortedDataInterface, SaveAndRestorePositionWhileIterateCursorReversed) {
ASSERT_EQ(entry, IndexKeyEntry(BSON("" << i), RecordId(42, i * 2)));
cursor->savePositioned();
- cursor->restore(opCtx.get());
+ cursor->restore();
}
ASSERT(!cursor->next());
ASSERT_EQ(i, -1);
@@ -166,7 +166,7 @@ TEST(SortedDataInterface, SaveAndRestorePositionWhileIterateCursorWithDupKeys) {
ASSERT_EQ(entry, IndexKeyEntry(key1, RecordId(42, i * 2)));
cursor->savePositioned();
- cursor->restore(opCtx.get());
+ cursor->restore();
}
ASSERT(!cursor->next());
ASSERT_EQ(i, nToInsert);
@@ -212,7 +212,7 @@ TEST(SortedDataInterface, SaveAndRestorePositionWhileIterateCursorWithDupKeysRev
ASSERT_EQ(entry, IndexKeyEntry(key1, RecordId(42, i * 2)));
cursor->savePositioned();
- cursor->restore(opCtx.get());
+ cursor->restore();
}
ASSERT(!cursor->next());
ASSERT_EQ(i, -1);
@@ -301,7 +301,7 @@ void testSaveAndRestorePositionSeesNewInserts(bool forward, bool unique) {
cursor->savePositioned();
insertToIndex(opCtx, sorted, {{key2, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1));
}
@@ -335,12 +335,12 @@ void testSaveAndRestorePositionSeesNewInsertsAfterRemove(bool forward, bool uniq
cursor->savePositioned();
removeFromIndex(opCtx, sorted, {{key1, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
// The restore may have seeked since it can't return to the saved position.
cursor->savePositioned(); // Should still save originally saved key as "current position".
insertToIndex(opCtx, sorted, {{key2, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1));
}
@@ -375,13 +375,13 @@ void testSaveAndRestorePositionSeesNewInsertsAfterEOF(bool forward, bool unique)
cursor->savePositioned();
removeFromIndex(opCtx, sorted, {{key1, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
// The restore may have seeked to EOF.
auto insertPoint = forward ? key2 : key0;
cursor->savePositioned(); // Should still save key1 as "current position".
insertToIndex(opCtx, sorted, {{insertPoint, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->next(), IndexKeyEntry(insertPoint, loc1));
}
@@ -415,24 +415,24 @@ void testSaveAndRestorePositionConsidersRecordId_Forward(bool unique) {
cursor->savePositioned();
removeFromIndex(opCtx, sorted, {{key1, loc1}});
insertToIndex(opCtx, sorted, {{key1, loc2}});
- cursor->restore(opCtx.get()); // Lands on inserted key.
+ cursor->restore(); // Lands on inserted key.
ASSERT_EQ(cursor->next(), IndexKeyEntry(key1, loc2));
cursor->savePositioned();
removeFromIndex(opCtx, sorted, {{key1, loc2}});
insertToIndex(opCtx, sorted, {{key1, loc1}});
- cursor->restore(opCtx.get()); // Lands after inserted.
+ cursor->restore(); // Lands after inserted.
ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1));
cursor->savePositioned();
removeFromIndex(opCtx, sorted, {{key2, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
cursor->savePositioned();
insertToIndex(opCtx, sorted, {{key2, loc1}});
- cursor->restore(opCtx.get()); // Lands at same point as initial save.
+ cursor->restore(); // Lands at same point as initial save.
// Advances from restore point since restore didn't move position.
ASSERT_EQ(cursor->next(), IndexKeyEntry(key3, loc1));
@@ -460,24 +460,24 @@ void testSaveAndRestorePositionConsidersRecordId_Reverse(bool unique) {
cursor->savePositioned();
removeFromIndex(opCtx, sorted, {{key2, loc2}});
insertToIndex(opCtx, sorted, {{key2, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1));
cursor->savePositioned();
removeFromIndex(opCtx, sorted, {{key2, loc1}});
insertToIndex(opCtx, sorted, {{key2, loc2}});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->next(), IndexKeyEntry(key1, loc1));
cursor->savePositioned();
removeFromIndex(opCtx, sorted, {{key1, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
cursor->savePositioned();
insertToIndex(opCtx, sorted, {{key1, loc1}});
- cursor->restore(opCtx.get()); // Lands at same point as initial save.
+ cursor->restore(); // Lands at same point as initial save.
// Advances from restore point since restore didn't move position.
ASSERT_EQ(cursor->next(), IndexKeyEntry(key0, loc1));
@@ -504,12 +504,12 @@ TEST(SortedDataInterface, SaveUnpositionedAndRestore) {
cursor->saveUnpositioned();
removeFromIndex(opCtx, sorted, {{key2, loc1}});
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->seek(key1, true), IndexKeyEntry(key1, loc1));
cursor->saveUnpositioned();
- cursor->restore(opCtx.get());
+ cursor->restore();
ASSERT_EQ(cursor->seek(key3, true), IndexKeyEntry(key3, loc1));
}
diff --git a/src/mongo/db/storage/sorted_data_interface_test_harness.cpp b/src/mongo/db/storage/sorted_data_interface_test_harness.cpp
index 13929c7eacc..7947aae82e5 100644
--- a/src/mongo/db/storage/sorted_data_interface_test_harness.cpp
+++ b/src/mongo/db/storage/sorted_data_interface_test_harness.cpp
@@ -359,7 +359,7 @@ TEST(SortedDataInterface, CursorIterate1WithSaveRestore) {
ASSERT_EQ(entry, IndexKeyEntry(BSON("" << n), RecordId(5, n * 2)));
n++;
cursor->savePositioned();
- cursor->restore(opCtx.get());
+ cursor->restore();
}
ASSERT_EQUALS(N, n);
}
@@ -388,7 +388,7 @@ TEST(SortedDataInterface, CursorIterateAllDupKeysWithSaveRestore) {
ASSERT_EQ(entry, IndexKeyEntry(BSON("" << 5), RecordId(5, n * 2)));
n++;
cursor->savePositioned();
- cursor->restore(opCtx.get());
+ cursor->restore();
}
ASSERT_EQUALS(N, n);
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
index bd727575954..d824406dfae 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp
@@ -595,8 +595,9 @@ namespace {
class WiredTigerIndexCursorBase : public SortedDataInterface::Cursor {
public:
WiredTigerIndexCursorBase(const WiredTigerIndex& idx, OperationContext* txn, bool forward)
- : _txn(txn), _cursor(idx.uri(), idx.tableId(), false, txn), _idx(idx), _forward(forward) {}
-
+ : _txn(txn), _idx(idx), _forward(forward) {
+ _cursor.emplace(_idx.uri(), _idx.tableId(), false, _txn);
+ }
boost::optional<IndexKeyEntry> next(RequestedInfo parts) override {
// Advance on a cursor at the end is a no-op
if (_eof)
@@ -654,24 +655,16 @@ public:
}
void savePositioned() override {
- if (!_txn)
- return; // still saved
-
- _savedForCheck = _txn->recoveryUnit();
-
- if (!wt_keeptxnopen()) {
- try {
- _cursor.reset();
- } catch (const WriteConflictException& wce) {
- // Ignore since this is only called when we are about to kill our transaction
- // anyway.
- }
-
- // Our saved position is wherever we were when we last called updatePosition().
- // Any partially completed repositions should not effect our saved position.
+ try {
+ if (_cursor)
+ _cursor->reset();
+ } catch (const WriteConflictException& wce) {
+ // Ignore since this is only called when we are about to kill our transaction
+ // anyway.
}
- _txn = NULL;
+ // Our saved position is wherever we were when we last called updatePosition().
+ // Any partially completed repositions should not effect our saved position.
}
void saveUnpositioned() override {
@@ -679,21 +672,30 @@ public:
_eof = true;
}
- void restore(OperationContext* txn) override {
- // Update the session handle with our new operation context.
- invariant(_savedForCheck == txn->recoveryUnit());
- _txn = txn;
+ void restore() override {
+ if (!_cursor) {
+ _cursor.emplace(_idx.uri(), _idx.tableId(), false, _txn);
+ }
- if (!wt_keeptxnopen()) {
- if (!_eof) {
- // Ensure an active session exists, so any restored cursors will bind to it
- WiredTigerRecoveryUnit::get(txn)->getSession(txn);
- _lastMoveWasRestore = !seekWTCursor(_key);
- TRACE_CURSOR << "restore _lastMoveWasRestore:" << _lastMoveWasRestore;
- }
+ // Ensure an active session exists, so any restored cursors will bind to it
+ invariant(WiredTigerRecoveryUnit::get(_txn)->getSession(_txn) == _cursor->getSession());
+
+ if (!_eof) {
+ _lastMoveWasRestore = !seekWTCursor(_key);
+ TRACE_CURSOR << "restore _lastMoveWasRestore:" << _lastMoveWasRestore;
}
}
+ void detachFromOperationContext() final {
+ _txn = nullptr;
+ _cursor = {};
+ }
+
+ void reattachToOperationContext(OperationContext* txn) final {
+ _txn = txn;
+ // _cursor recreated in restore() to avoid risk of WT_ROLLBACK issues.
+ }
+
protected:
// Called after _key has been filled in. Must not throw WriteConflictException.
virtual void updateLocAndTypeBits() = 0;
@@ -738,7 +740,7 @@ protected:
}
void advanceWTCursor() {
- WT_CURSOR* c = _cursor.get();
+ WT_CURSOR* c = _cursor->get();
int ret = WT_OP_CHECK(_forward ? c->next(c) : c->prev(c));
if (ret == WT_NOTFOUND) {
_cursorAtEof = true;
@@ -750,7 +752,7 @@ protected:
// Seeks to query. Returns true on exact match.
bool seekWTCursor(const KeyString& query) {
- WT_CURSOR* c = _cursor.get();
+ WT_CURSOR* c = _cursor->get();
int cmp = -1;
const WiredTigerItem keyItem(query.getBuffer(), query.getSize());
@@ -795,7 +797,7 @@ protected:
_eof = false;
- WT_CURSOR* c = _cursor.get();
+ WT_CURSOR* c = _cursor->get();
WT_ITEM item;
invariantWTOK(c->get_key(c, &item));
_key.resetFromBuffer(item.data, item.size);
@@ -809,13 +811,10 @@ protected:
}
OperationContext* _txn;
- WiredTigerCursor _cursor;
+ boost::optional<WiredTigerCursor> _cursor;
const WiredTigerIndex& _idx; // not owned
const bool _forward;
- // Ensures we have the same RU at restore time.
- RecoveryUnit* _savedForCheck;
-
// These are where this cursor instance is. They are not changed in the face of a failing
// next().
KeyString _key;
@@ -844,7 +843,7 @@ public:
void updateLocAndTypeBits() override {
_loc = KeyString::decodeRecordIdAtEnd(_key.getBuffer(), _key.getSize());
- WT_CURSOR* c = _cursor.get();
+ WT_CURSOR* c = _cursor->get();
WT_ITEM item;
invariantWTOK(c->get_value(c, &item));
BufReader br(item.data, item.size);
@@ -857,8 +856,8 @@ public:
WiredTigerIndexUniqueCursor(const WiredTigerIndex& idx, OperationContext* txn, bool forward)
: WiredTigerIndexCursorBase(idx, txn, forward) {}
- void restore(OperationContext* txn) override {
- WiredTigerIndexCursorBase::restore(txn);
+ void restore() override {
+ WiredTigerIndexCursorBase::restore();
// In addition to seeking to the correct key, we also need to make sure that the loc is
// on the correct side of _loc.
@@ -869,7 +868,7 @@ public:
// If we get here we need to look at the actual RecordId for this key and make sure we
// are supposed to see it.
- WT_CURSOR* c = _cursor.get();
+ WT_CURSOR* c = _cursor->get();
WT_ITEM item;
invariantWTOK(c->get_value(c, &item));
@@ -893,7 +892,7 @@ public:
// We assume that cursors can only ever see unique indexes in their "pristine" state,
// where no duplicates are possible. The cases where dups are allowed should hold
// sufficient locks to ensure that no cursor ever sees them.
- WT_CURSOR* c = _cursor.get();
+ WT_CURSOR* c = _cursor->get();
WT_ITEM item;
invariantWTOK(c->get_value(c, &item));
@@ -912,7 +911,7 @@ public:
_query.resetToKey(stripFieldNames(key), _idx.ordering());
const WiredTigerItem keyItem(_query.getBuffer(), _query.getSize());
- WT_CURSOR* c = _cursor.get();
+ WT_CURSOR* c = _cursor->get();
c->set_key(c, keyItem.Get());
// Using search rather than search_near.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index c423cd3e1f9..9008eb4ce2a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -93,16 +93,13 @@ const std::string kWiredTigerEngineName = "wiredTiger";
class WiredTigerRecordStore::Cursor final : public RecordCursor {
public:
- Cursor(OperationContext* txn,
- const WiredTigerRecordStore& rs,
- bool forward = true,
- bool forParallelCollectionScan = false)
+ Cursor(OperationContext* txn, const WiredTigerRecordStore& rs, bool forward = true)
: _rs(rs),
_txn(txn),
_forward(forward),
- _forParallelCollectionScan(forParallelCollectionScan),
- _cursor(new WiredTigerCursor(rs.getURI(), rs.tableId(), true, txn)),
- _readUntilForOplog(WiredTigerRecoveryUnit::get(txn)->getOplogReadTill()) {}
+ _readUntilForOplog(WiredTigerRecoveryUnit::get(txn)->getOplogReadTill()) {
+ _cursor.emplace(rs.getURI(), rs.tableId(), true, txn);
+ }
boost::optional<Record> next() final {
if (_eof)
@@ -186,27 +183,13 @@ public:
}
void savePositioned() final {
- // It must be safe to call save() twice in a row without calling restore().
- if (!_txn)
- return;
-
- // the cursor and recoveryUnit are valid on restore
- // so we just record the recoveryUnit to make sure
- _savedRecoveryUnit = _txn->recoveryUnit();
- if (_cursor && !wt_keeptxnopen()) {
- try {
+ try {
+ if (_cursor)
_cursor->reset();
- } catch (const WriteConflictException& wce) {
- // Ignore since this is only called when we are about to kill our transaction
- // anyway.
- }
+ } catch (const WriteConflictException& wce) {
+ // Ignore since this is only called when we are about to kill our transaction
+ // anyway.
}
-
- if (_forParallelCollectionScan) {
- // Delete the cursor since we may come back to a different RecoveryUnit
- _cursor.reset();
- }
- _txn = nullptr;
}
void saveUnpositioned() final {
@@ -214,31 +197,20 @@ public:
_lastReturnedId = RecordId();
}
- bool restore(OperationContext* txn) final {
- _txn = txn;
+ bool restore() final {
+ if (!_cursor)
+ _cursor.emplace(_rs.getURI(), _rs.tableId(), true, _txn);
+
+ // This will ensure an active session exists, so any restored cursors will bind to it
+ invariant(WiredTigerRecoveryUnit::get(_txn)->getSession(_txn) == _cursor->getSession());
// If we've hit EOF, then this iterator is done and need not be restored.
if (_eof)
return true;
- bool needRestore = false;
-
- if (_forParallelCollectionScan) {
- needRestore = true;
- _savedRecoveryUnit = txn->recoveryUnit();
- _cursor.reset(new WiredTigerCursor(_rs.getURI(), _rs.tableId(), true, txn));
- _forParallelCollectionScan = false; // we only do this the first time
- }
- invariant(_savedRecoveryUnit == txn->recoveryUnit());
-
- if (!needRestore && wt_keeptxnopen())
- return true;
if (_lastReturnedId.isNull())
return true;
- // This will ensure an active session exists, so any restored cursors will bind to it
- invariant(WiredTigerRecoveryUnit::get(txn)->getSession(txn) == _cursor->getSession());
-
WT_CURSOR* c = _cursor->get();
c->set_key(c, _makeKey(_lastReturnedId));
@@ -276,6 +248,16 @@ public:
return true;
}
+ void detachFromOperationContext() final {
+ _txn = nullptr;
+ _cursor = {};
+ }
+
+ void reattachToOperationContext(OperationContext* txn) final {
+ _txn = txn;
+ // _cursor recreated in restore() to avoid risk of WT_ROLLBACK issues.
+ }
+
private:
bool isVisible(const RecordId& id) {
if (!_rs._isCapped)
@@ -297,10 +279,8 @@ private:
const WiredTigerRecordStore& _rs;
OperationContext* _txn;
- RecoveryUnit* _savedRecoveryUnit; // only used to sanity check between save/restore.
const bool _forward;
- bool _forParallelCollectionScan; // This can go away once SERVER-17364 is resolved.
- std::unique_ptr<WiredTigerCursor> _cursor;
+ boost::optional<WiredTigerCursor> _cursor;
bool _eof = false;
RecordId _lastReturnedId; // If null, need to seek to first/last record.
const RecordId _readUntilForOplog;
@@ -906,10 +886,7 @@ std::unique_ptr<RecordCursor> WiredTigerRecordStore::getCursor(OperationContext*
std::vector<std::unique_ptr<RecordCursor>> WiredTigerRecordStore::getManyCursors(
OperationContext* txn) const {
std::vector<std::unique_ptr<RecordCursor>> cursors(1);
- cursors[0] = stdx::make_unique<Cursor>(txn,
- *this,
- /*forward=*/true,
- /*forParallelCollectionScan=*/true);
+ cursors[0] = stdx::make_unique<Cursor>(txn, *this, /*forward=*/true);
return cursors;
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
index 2dd31906af8..38447246664 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
@@ -735,7 +735,7 @@ TEST(WiredTigerRecordStoreTest, CappedCursorRollover) {
}
// cursor should now be dead
- ASSERT_FALSE(cursor->restore(cursorCtx.get()));
+ ASSERT_FALSE(cursor->restore());
ASSERT(!cursor->next());
}
@@ -871,7 +871,7 @@ TEST(WiredTigerRecordStoreTest, CappedCursorYieldFirst) {
// See that things work if you yield before you first call getNext().
cursor->savePositioned();
cursorCtx->recoveryUnit()->abandonSnapshot();
- ASSERT_TRUE(cursor->restore(cursorCtx.get()));
+ ASSERT_TRUE(cursor->restore());
auto record = cursor->next();
ASSERT_EQ(loc1, record->id);
ASSERT(!cursor->next());
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 8ee03af4311..cdc9981df78 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -378,19 +378,6 @@ void WiredTigerRecoveryUnit::_txnOpen(OperationContext* opCtx) {
_active = true;
}
-void WiredTigerRecoveryUnit::beingReleasedFromOperationContext() {
- LOG(2) << "WiredTigerRecoveryUnit::beingReleased";
- _currentlySquirreled = true;
- if (_active == false && !wt_keeptxnopen()) {
- _commit();
- }
-}
-void WiredTigerRecoveryUnit::beingSetOnOperationContext() {
- LOG(2) << "WiredTigerRecoveryUnit::broughtBack";
- _currentlySquirreled = false;
-}
-
-
// ---------------------
WiredTigerCursor::WiredTigerCursor(const std::string& uri,
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index d89ef1f3964..edd2085db29 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -65,9 +65,6 @@ public:
virtual void registerChange(Change*);
- virtual void beingReleasedFromOperationContext();
- virtual void beingSetOnOperationContext();
-
virtual void abandonSnapshot();
// un-used API
@@ -80,6 +77,9 @@ public:
virtual SnapshotId getSnapshotId() const;
Status setReadFromMajorityCommittedSnapshot() final;
+ bool isReadingFromMajorityCommittedSnapshot() final {
+ return _readFromMajorityCommittedSnapshot;
+ }
// ---- WT STUFF
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp
index 383f276d5a3..c05e886468d 100644
--- a/src/mongo/db/ttl.cpp
+++ b/src/mongo/db/ttl.cpp
@@ -304,7 +304,7 @@ private:
}
++numDeleted;
ttlDeletedDocuments.increment();
- if (!exec->restoreState(txn)) {
+ if (!exec->restoreState()) {
return true;
}
}
diff --git a/src/mongo/dbtests/executor_registry.cpp b/src/mongo/dbtests/executor_registry.cpp
index dc02f95c58f..cfa67a3bf29 100644
--- a/src/mongo/dbtests/executor_registry.cpp
+++ b/src/mongo/dbtests/executor_registry.cpp
@@ -157,7 +157,7 @@ public:
deregisterExecutor(run.get());
// And clean up anything that happened before.
- run->restoreState(&_opCtx);
+ run->restoreState();
// Make sure that the runner moved forward over the deleted data. We don't see foo==10
// or foo==11.
@@ -192,7 +192,7 @@ public:
// Unregister and restore state.
deregisterExecutor(run.get());
- run->restoreState(&_opCtx);
+ run->restoreState();
ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL));
ASSERT_EQUALS(10, obj["foo"].numberInt());
@@ -206,7 +206,7 @@ public:
// Unregister and restore state.
deregisterExecutor(run.get());
- run->restoreState(&_opCtx);
+ run->restoreState();
// PlanExecutor was killed.
ASSERT_EQUALS(PlanExecutor::DEAD, run->getNext(&obj, NULL));
@@ -237,7 +237,7 @@ public:
// Unregister and restore state.
deregisterExecutor(run.get());
- run->restoreState(&_opCtx);
+ run->restoreState();
// PlanExecutor was killed.
ASSERT_EQUALS(PlanExecutor::DEAD, run->getNext(&obj, NULL));
@@ -268,7 +268,7 @@ public:
// Unregister and restore state.
deregisterExecutor(run.get());
- run->restoreState(&_opCtx);
+ run->restoreState();
// PlanExecutor was killed.
ASSERT_EQUALS(PlanExecutor::DEAD, run->getNext(&obj, NULL));
@@ -300,7 +300,7 @@ public:
// Unregister and restore state.
deregisterExecutor(run.get());
- run->restoreState(&_opCtx);
+ run->restoreState();
ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL));
ASSERT_EQUALS(10, obj["foo"].numberInt());
@@ -316,7 +316,7 @@ public:
// Unregister and restore state.
deregisterExecutor(run.get());
- run->restoreState(&_opCtx);
+ run->restoreState();
_ctx.reset();
// PlanExecutor was killed.
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index e24638ee5c7..e557cf844e7 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -419,7 +419,7 @@ public:
PlanExecutor* exec = makeCollScanExec(coll, filterObj);
// Make a client cursor from the runner.
- new ClientCursor(coll->getCursorManager(), exec, ns(), 0, BSONObj());
+ new ClientCursor(coll->getCursorManager(), exec, ns(), false, 0, BSONObj());
// There should be one cursor before invalidation,
// and zero cursors after invalidation.
@@ -446,7 +446,7 @@ public:
// Make a client cursor from the runner.
ClientCursor* cc =
- new ClientCursor(collection->getCursorManager(), exec, ns(), 0, BSONObj());
+ new ClientCursor(collection->getCursorManager(), exec, ns(), false, 0, BSONObj());
ClientCursorPin ccPin(collection->getCursorManager(), cc->cursorid());
// If the cursor is pinned, it sticks around,
@@ -490,7 +490,7 @@ public:
PlanExecutor* exec = makeCollScanExec(collection, filterObj);
// Make a client cursor from the runner.
- new ClientCursor(collection->getCursorManager(), exec, ns(), 0, BSONObj());
+ new ClientCursor(collection->getCursorManager(), exec, ns(), false, 0, BSONObj());
}
// There should be one cursor before timeout,
diff --git a/src/mongo/dbtests/query_stage_and.cpp b/src/mongo/dbtests/query_stage_and.cpp
index f1af8d32704..22fcdc5ad70 100644
--- a/src/mongo/dbtests/query_stage_and.cpp
+++ b/src/mongo/dbtests/query_stage_and.cpp
@@ -223,7 +223,7 @@ public:
}
}
size_t memUsageAfter = ah->getMemUsage();
- ah->restoreState(&_txn);
+ ah->restoreState();
// Invalidating a read object should decrease memory usage.
ASSERT_LESS_THAN(memUsageAfter, memUsageBefore);
@@ -331,7 +331,7 @@ public:
// Look ahead results do not count towards memory usage.
ASSERT_EQUALS(memUsageBefore, memUsageAfter);
- ah->restoreState(&_txn);
+ ah->restoreState();
// The deleted obj should show up in flagged.
ASSERT_EQUALS(size_t(1), flagged.size());
@@ -1056,7 +1056,7 @@ public:
ah->saveState();
ah->invalidate(&_txn, *data.begin(), INVALIDATION_DELETION);
remove(coll->docFor(&_txn, *data.begin()).value());
- ah->restoreState(&_txn);
+ ah->restoreState();
// Make sure the nuked obj is actually in the flagged data.
ASSERT_EQUALS(ws.getFlagged().size(), size_t(1));
@@ -1099,7 +1099,7 @@ public:
ah->saveState();
ah->invalidate(&_txn, *it, INVALIDATION_DELETION);
remove(coll->docFor(&_txn, *it).value());
- ah->restoreState(&_txn);
+ ah->restoreState();
// Get all results aside from the two we killed.
while (!ah->isEOF()) {
diff --git a/src/mongo/dbtests/query_stage_collscan.cpp b/src/mongo/dbtests/query_stage_collscan.cpp
index 33341c12b5e..eea912d4bd4 100644
--- a/src/mongo/dbtests/query_stage_collscan.cpp
+++ b/src/mongo/dbtests/query_stage_collscan.cpp
@@ -301,7 +301,7 @@ public:
scan->saveState();
scan->invalidate(&_txn, locs[count], INVALIDATION_DELETION);
remove(coll->docFor(&_txn, locs[count]).value());
- scan->restoreState(&_txn);
+ scan->restoreState();
// Skip over locs[count].
++count;
@@ -362,7 +362,7 @@ public:
scan->saveState();
scan->invalidate(&_txn, locs[count], INVALIDATION_DELETION);
remove(coll->docFor(&_txn, locs[count]).value());
- scan->restoreState(&_txn);
+ scan->restoreState();
// Skip over locs[count].
++count;
diff --git a/src/mongo/dbtests/query_stage_count.cpp b/src/mongo/dbtests/query_stage_count.cpp
index 82952021dca..b1273f6f7f8 100644
--- a/src/mongo/dbtests/query_stage_count.cpp
+++ b/src/mongo/dbtests/query_stage_count.cpp
@@ -182,7 +182,7 @@ public:
}
// resume from yield
- count_stage.restoreState(&_txn);
+ count_stage.restoreState();
}
return static_cast<const CountStats*>(count_stage.getSpecificStats());
diff --git a/src/mongo/dbtests/query_stage_count_scan.cpp b/src/mongo/dbtests/query_stage_count_scan.cpp
index 012a2442bad..e499e06cbdc 100644
--- a/src/mongo/dbtests/query_stage_count_scan.cpp
+++ b/src/mongo/dbtests/query_stage_count_scan.cpp
@@ -329,7 +329,7 @@ public:
count.saveState();
// Recover from yield
- count.restoreState(&_txn);
+ count.restoreState();
// finish counting
while (PlanStage::IS_EOF != countState) {
@@ -385,7 +385,7 @@ public:
remove(BSON("a" << GTE << 5));
// Recover from yield
- count.restoreState(&_txn);
+ count.restoreState();
// finish counting
while (PlanStage::IS_EOF != countState) {
@@ -444,7 +444,7 @@ public:
insert(BSON("a" << 6.5));
// Recover from yield
- count.restoreState(&_txn);
+ count.restoreState();
// finish counting
while (PlanStage::IS_EOF != countState) {
@@ -500,7 +500,7 @@ public:
insert(BSON("a" << BSON_ARRAY(10 << 11)));
// Recover from yield
- count.restoreState(&_txn);
+ count.restoreState();
// finish counting
while (PlanStage::IS_EOF != countState) {
@@ -623,7 +623,7 @@ public:
remove(BSON("a" << 1 << "b" << 5));
// Recover from yield
- count.restoreState(&_txn);
+ count.restoreState();
// finish counting
while (PlanStage::IS_EOF != countState) {
diff --git a/src/mongo/dbtests/query_stage_delete.cpp b/src/mongo/dbtests/query_stage_delete.cpp
index 5c7e888e552..5aeea451a22 100644
--- a/src/mongo/dbtests/query_stage_delete.cpp
+++ b/src/mongo/dbtests/query_stage_delete.cpp
@@ -168,7 +168,7 @@ public:
BSONObj targetDoc = coll->docFor(&_txn, locs[targetDocIndex]).value();
ASSERT(!targetDoc.isEmpty());
remove(targetDoc);
- deleteStage.restoreState(&_txn);
+ deleteStage.restoreState();
// Remove the rest.
while (!deleteStage.isEOF()) {
diff --git a/src/mongo/dbtests/query_stage_ixscan.cpp b/src/mongo/dbtests/query_stage_ixscan.cpp
index d6fcaf177d4..7295802d730 100644
--- a/src/mongo/dbtests/query_stage_ixscan.cpp
+++ b/src/mongo/dbtests/query_stage_ixscan.cpp
@@ -195,7 +195,7 @@ public:
ixscan->saveState();
insert(fromjson("{_id: 4, x: 10}"));
insert(fromjson("{_id: 5, x: 11}"));
- ixscan->restoreState(&_txn);
+ ixscan->restoreState();
member = getNext(ixscan.get());
ASSERT_EQ(WorkingSetMember::LOC_AND_IDX, member->getState());
@@ -228,7 +228,7 @@ public:
// Save state and insert an indexed doc.
ixscan->saveState();
insert(fromjson("{_id: 4, x: 7}"));
- ixscan->restoreState(&_txn);
+ ixscan->restoreState();
member = getNext(ixscan.get());
ASSERT_EQ(WorkingSetMember::LOC_AND_IDX, member->getState());
@@ -261,7 +261,7 @@ public:
// Save state and insert an indexed doc.
ixscan->saveState();
insert(fromjson("{_id: 4, x: 10}"));
- ixscan->restoreState(&_txn);
+ ixscan->restoreState();
// Ensure that we're EOF and we don't erroneously return {'': 12}.
WorkingSetID id;
@@ -295,7 +295,7 @@ public:
ixscan->saveState();
insert(fromjson("{_id: 4, x: 6}"));
insert(fromjson("{_id: 5, x: 9}"));
- ixscan->restoreState(&_txn);
+ ixscan->restoreState();
// Ensure that we don't erroneously return {'': 9} or {'':3}.
member = getNext(ixscan.get());
diff --git a/src/mongo/dbtests/query_stage_merge_sort.cpp b/src/mongo/dbtests/query_stage_merge_sort.cpp
index 55f27b3059e..e2d8f41ed39 100644
--- a/src/mongo/dbtests/query_stage_merge_sort.cpp
+++ b/src/mongo/dbtests/query_stage_merge_sort.cpp
@@ -587,7 +587,7 @@ public:
// Invalidate locs[11]. Should force a fetch. We don't get it back.
ms->saveState();
ms->invalidate(&_txn, *it, INVALIDATION_DELETION);
- ms->restoreState(&_txn);
+ ms->restoreState();
// Make sure locs[11] was fetched for us.
{
diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp
index b28abb5b1d5..d6f9ab81d1c 100644
--- a/src/mongo/dbtests/query_stage_sort.cpp
+++ b/src/mongo/dbtests/query_stage_sort.cpp
@@ -345,7 +345,7 @@ public:
coll->updateDocument(&_txn, *it, oldDoc, newDoc, false, false, NULL, args);
wuow.commit();
}
- exec->restoreState(&_txn);
+ exec->restoreState();
// Read the rest of the data from the queued data stage.
while (!ms->isEOF()) {
@@ -364,7 +364,7 @@ public:
wuow.commit();
}
}
- exec->restoreState(&_txn);
+ exec->restoreState();
// Verify that it's sorted, the right number of documents are returned, and they're all
// in the expected range.
@@ -444,7 +444,7 @@ public:
coll->deleteDocument(&_txn, *it++, false, false, NULL);
wuow.commit();
}
- exec->restoreState(&_txn);
+ exec->restoreState();
// Read the rest of the data from the queued data stage.
while (!ms->isEOF()) {
@@ -461,7 +461,7 @@ public:
wuow.commit();
}
}
- exec->restoreState(&_txn);
+ exec->restoreState();
// Regardless of storage engine, all the documents should come back with their objects
int count = 0;
diff --git a/src/mongo/dbtests/query_stage_update.cpp b/src/mongo/dbtests/query_stage_update.cpp
index 3290e40ad6c..330acf33b8f 100644
--- a/src/mongo/dbtests/query_stage_update.cpp
+++ b/src/mongo/dbtests/query_stage_update.cpp
@@ -318,7 +318,7 @@ public:
BSONObj targetDoc = coll->docFor(&_txn, locs[targetDocIndex]).value();
ASSERT(!targetDoc.isEmpty());
remove(targetDoc);
- updateStage->restoreState(&_txn);
+ updateStage->restoreState();
// Do the remaining updates.
while (!updateStage->isEOF()) {