diff options
author | Mathias Stearn <mathias@10gen.com> | 2015-07-06 19:16:30 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2015-07-16 14:37:02 -0400 |
commit | b9d2e18ca68246e5d21ed42a846ff4094867f159 (patch) | |
tree | cdbbac6dc5ee00404cf6452f5dd70612983127e3 /src | |
parent | c832bc753c29f91597b75fa02c0d9019c3c20b0f (diff) | |
download | mongo-b9d2e18ca68246e5d21ed42a846ff4094867f159.tar.gz |
SERVER-17364 Don't stash RecoveryUnits across getMores
We now tell PlanExecutors to detach from their OperationContexts and to shed
all storage engine resources before stashing the ClientCursor. This is a
heavier weight operation than a normal save/restoreState which is no longer
allowed to change the OperationContext.
Diffstat (limited to 'src')
95 files changed, 662 insertions, 621 deletions
diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp index b1adb472565..e66cfa6b9e9 100644 --- a/src/mongo/db/catalog/capped_utils.cpp +++ b/src/mongo/db/catalog/capped_utils.cpp @@ -197,7 +197,7 @@ Status cloneCollectionAsCapped(OperationContext* txn, // around call to abandonSnapshot. exec->saveState(); txn->recoveryUnit()->abandonSnapshot(); - exec->restoreState(txn); // Handles any WCEs internally. + exec->restoreState(); // Handles any WCEs internally. } } diff --git a/src/mongo/db/catalog/index_create.cpp b/src/mongo/db/catalog/index_create.cpp index c642fcb83a5..8f654411988 100644 --- a/src/mongo/db/catalog/index_create.cpp +++ b/src/mongo/db/catalog/index_create.cpp @@ -286,7 +286,7 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(std::set<RecordId>* dupsO // around call to abandonSnapshot. exec->saveState(); _txn->recoveryUnit()->abandonSnapshot(); - exec->restoreState(_txn); // Handles any WCEs internally. + exec->restoreState(); // Handles any WCEs internally. } } diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 7b45e3444ea..c5499ee7166 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -80,14 +80,15 @@ long long ClientCursor::totalOpen() { ClientCursor::ClientCursor(CursorManager* cursorManager, PlanExecutor* exec, const std::string& ns, + bool isReadCommitted, int qopts, const BSONObj query, bool isAggCursor) : _ns(ns), + _isReadCommitted(isReadCommitted), _cursorManager(cursorManager), _countedYet(false), - _isAggCursor(isAggCursor), - _unownedRU(NULL) { + _isAggCursor(isAggCursor) { _exec.reset(exec); _query = query; _queryOptions = qopts; @@ -99,11 +100,11 @@ ClientCursor::ClientCursor(CursorManager* cursorManager, ClientCursor::ClientCursor(const Collection* collection) : _ns(collection->ns().ns()), + _isReadCommitted(false), _cursorManager(collection->getCursorManager()), _countedYet(false), _queryOptions(QueryOption_NoCursorTimeout), - _isAggCursor(false), - _unownedRU(NULL) { + _isAggCursor(false) { init(); } @@ -197,30 +198,6 @@ void ClientCursor::updateSlaveLocation(OperationContext* txn) { } // -// Storage engine state for getMore. -// - -void ClientCursor::setUnownedRecoveryUnit(RecoveryUnit* ru) { - invariant(!_unownedRU); - invariant(!_ownedRU.get()); - _unownedRU = ru; -} - -RecoveryUnit* ClientCursor::getUnownedRecoveryUnit() const { - return _unownedRU; -} - -void ClientCursor::setOwnedRecoveryUnit(RecoveryUnit* ru) { - invariant(!_unownedRU); - invariant(!_ownedRU.get()); - _ownedRU.reset(ru); -} - -RecoveryUnit* ClientCursor::releaseOwnedRecoveryUnit() { - return _ownedRU.release(); -} - -// // Pin methods // diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 3f7e9797ae5..832d24e9a87 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -59,12 +59,15 @@ public: ClientCursor(CursorManager* cursorManager, PlanExecutor* exec, const std::string& ns, + bool isReadCommitted, int qopts = 0, const BSONObj query = BSONObj(), bool isAggCursor = false); /** * This ClientCursor is used to track sharding state for the given collection. + * + * Do not use outside of RangePreserver! */ explicit ClientCursor(const Collection* collection); @@ -81,6 +84,9 @@ public: CursorManager* cursorManager() const { return _cursorManager; } + bool isReadCommitted() const { + return _isReadCommitted; + } bool isAggCursor() const { return _isAggCursor; } @@ -180,49 +186,6 @@ public: static long long totalOpen(); - // - // Storage engine state for getMore. - // - - bool hasRecoveryUnit() const { - return _ownedRU.get() || _unownedRU; - } - - /** - * - * If a ClientCursor is created via DBDirectClient, it uses the same storage engine - * context as the DBDirectClient caller. We store this context in _unownedRU. We use - * this to verify that all further callers use the same RecoveryUnit. - * - * Once a ClientCursor has an unowned RecoveryUnit, it will always have one. - * - * Sets the unowned RecoveryUnit to 'ru'. Does NOT take ownership of the pointer. - */ - void setUnownedRecoveryUnit(RecoveryUnit* ru); - - /** - * Return the unowned RecoveryUnit. 'this' does not own pointer and therefore cannot - * transfer ownership. - */ - RecoveryUnit* getUnownedRecoveryUnit() const; - - /** - * If a ClientCursor is created via a client request, we bind its lifetime to the - * ClientCursor's by storing it un _ownedRU. In order to execute the query over repeated - * network requests, we have to keep the execution state around. - */ - - /** - * Set the owned recovery unit to 'ru'. Takes ownership of it. If there is a previous - * owned recovery unit, it is deleted. - */ - void setOwnedRecoveryUnit(RecoveryUnit* ru); - - /** - * Returns the owned recovery unit. Ownership is transferred to the caller. - */ - RecoveryUnit* releaseOwnedRecoveryUnit(); - private: friend class CursorManager; friend class ClientCursorPin; @@ -248,6 +211,8 @@ private: // The namespace we're operating on. std::string _ns; + const bool _isReadCommitted; + CursorManager* _cursorManager; // if we've added it to the total open counter yet @@ -270,7 +235,7 @@ private: // should not be killed or destroyed when the underlying collection is deleted. // // Note: This should *not* be set for the internal cursor used as input to an aggregation. - bool _isAggCursor; + const bool _isAggCursor; // Is this cursor in use? Defaults to false. bool _isPinned; @@ -288,13 +253,6 @@ private: // TODO: Document. uint64_t _leftoverMaxTimeMicros; - // Only one of these is not-NULL. - RecoveryUnit* _unownedRU; - std::unique_ptr<RecoveryUnit> _ownedRU; - // NOTE: _ownedRU must come before _exec, because _ownedRU must outlive _exec. - // The storage engine can have resources in the PlanExecutor that rely on - // the RecoveryUnit being alive. - // // The underlying execution machinery. // diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index f25010c3ce0..36d9e6f4f82 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -285,11 +285,13 @@ public: // Create a ClientCursor containing this plan executor. We don't have to worry // about leaking it as it's inserted into a global map by its ctor. - ClientCursor* cursor = new ClientCursor(collection->getCursorManager(), - exec.release(), - nss.ns(), - pq.getOptions(), - pq.getFilter()); + ClientCursor* cursor = + new ClientCursor(collection->getCursorManager(), + exec.release(), + nss.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + pq.getOptions(), + pq.getFilter()); CursorId cursorId = cursor->cursorid(); ClientCursorPin ccPin(collection->getCursorManager(), cursorId); @@ -335,20 +337,10 @@ public: if (shouldSaveCursor(txn, collection, state, cursorExec)) { // State will be restored on getMore. cursorExec->saveState(); + cursorExec->detachFromOperationContext(); cursor->setLeftoverMaxTimeMicros(CurOp::get(txn)->getRemainingMaxTimeMicros()); cursor->setPos(numResults); - - // Don't stash the RU for tailable cursors at EOF, let them get a new RU on their - // next getMore. - if (!(pq.isTailable() && state == PlanExecutor::IS_EOF)) { - // We stash away the RecoveryUnit in the ClientCursor. It's used for - // subsequent getMore requests. The calling OpCtx gets a fresh RecoveryUnit. - txn->recoveryUnit()->abandonSnapshot(); - cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit()); - StorageEngine* engine = getGlobalServiceContext()->getGlobalStorageEngine(); - txn->setRecoveryUnit(engine->newRecoveryUnit(), OperationContext::kNotInUnitOfWork); - } } else { cursorId = 0; } diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 980ce11703c..06a2d14b97d 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -244,14 +244,8 @@ public: // On early return, get rid of the cursor. ScopeGuard cursorFreer = MakeGuard(&GetMoreCmd::cleanupCursor, txn, &ccPin, request); - if (!cursor->hasRecoveryUnit()) { - // Start using a new RecoveryUnit. - cursor->setOwnedRecoveryUnit( - getGlobalServiceContext()->getGlobalStorageEngine()->newRecoveryUnit()); - } - - // Swap RecoveryUnit(s) between the ClientCursor and OperationContext. - ScopedRecoveryUnitSwapper ruSwapper(cursor, txn); + if (cursor->isReadCommitted()) + uassertStatusOK(txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); // Reset timeout timer on the cursor since the cursor is still in use. cursor->setIdleTime(0); @@ -270,7 +264,8 @@ public: } PlanExecutor* exec = cursor->getExecutor(); - exec->restoreState(txn); + exec->reattachToOperationContext(txn); + exec->restoreState(); // If we're tailing a capped collection, retrieve a monotonically increasing insert // counter. @@ -309,7 +304,7 @@ public: notifier.reset(); ctx.reset(new AutoGetCollectionForRead(txn, request.nss)); - exec->restoreState(txn); + exec->restoreState(); // We woke up because either the timed_wait expired, or there was more data. Either // way, attempt to generate another batch of results. @@ -323,6 +318,7 @@ public: respondWithId = request.cursorid; exec->saveState(); + exec->detachFromOperationContext(); // If maxTimeMS was set directly on the getMore rather than being rolled over // from a previous find, then don't roll remaining micros over to the next @@ -332,12 +328,6 @@ public: } cursor->incPos(numResults); - - if (isCursorTailable(cursor) && state == PlanExecutor::IS_EOF) { - // Rather than swapping their existing RU into the client cursor, tailable - // cursors should get a new recovery unit. - ruSwapper.dismiss(); - } } else { CurOp::get(txn)->debug().cursorExhausted = true; } diff --git a/src/mongo/db/commands/list_collections.cpp b/src/mongo/db/commands/list_collections.cpp index 8fb6f063590..dd3549d15cc 100644 --- a/src/mongo/db/commands/list_collections.cpp +++ b/src/mongo/db/commands/list_collections.cpp @@ -194,8 +194,12 @@ public: CursorId cursorId = 0LL; if (!exec->isEOF()) { exec->saveState(); - ClientCursor* cursor = new ClientCursor( - CursorManager::getGlobalCursorManager(), exec.release(), cursorNamespace); + exec->detachFromOperationContext(); + ClientCursor* cursor = + new ClientCursor(CursorManager::getGlobalCursorManager(), + exec.release(), + cursorNamespace, + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()); cursorId = cursor->cursorid(); } diff --git a/src/mongo/db/commands/list_indexes.cpp b/src/mongo/db/commands/list_indexes.cpp index 06d08e9cce0..0f46e4e7427 100644 --- a/src/mongo/db/commands/list_indexes.cpp +++ b/src/mongo/db/commands/list_indexes.cpp @@ -192,8 +192,12 @@ public: CursorId cursorId = 0LL; if (!exec->isEOF()) { exec->saveState(); - ClientCursor* cursor = new ClientCursor( - CursorManager::getGlobalCursorManager(), exec.release(), cursorNamespace); + exec->detachFromOperationContext(); + ClientCursor* cursor = + new ClientCursor(CursorManager::getGlobalCursorManager(), + exec.release(), + cursorNamespace, + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()); cursorId = cursor->cursorid(); } diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 307f848d3f9..c275157e762 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1048,7 +1048,7 @@ void State::finalReduce(CurOp* op, ProgressMeterHolder& pm) { prev = o; all.push_back(o); - if (!exec->restoreState(_txn)) { + if (!exec->restoreState()) { break; } @@ -1429,7 +1429,7 @@ public: scopedXact.reset(new ScopedTransaction(txn, MODE_IS)); scopedAutoDb.reset(new AutoGetDb(txn, nss.db(), MODE_S)); - exec->restoreState(txn); + exec->restoreState(); // Need to reload the database, in case it was dropped after we // released the lock diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index c1dcf50dfe0..351fc4f7711 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -39,7 +39,7 @@ #include "mongo/db/query/cursor_responses.h" #include "mongo/db/service_context.h" #include "mongo/stdx/memory.h" -#include "mongo/util/touch_pages.h" +#include "mongo/base/checked_cast.h" namespace mongo { @@ -111,7 +111,7 @@ public: numCursors = iterators.size(); } - OwnedPointerVector<PlanExecutor> execs; + std::vector<std::unique_ptr<PlanExecutor>> execs; for (size_t i = 0; i < numCursors; i++) { unique_ptr<WorkingSet> ws = make_unique<WorkingSet>(); unique_ptr<MultiIteratorStage> mis = @@ -121,50 +121,35 @@ public: auto statusWithPlanExecutor = PlanExecutor::make( txn, std::move(ws), std::move(mis), collection, PlanExecutor::YIELD_AUTO); invariant(statusWithPlanExecutor.isOK()); - unique_ptr<PlanExecutor> curExec = std::move(statusWithPlanExecutor.getValue()); - - // The PlanExecutor was registered on construction due to the YIELD_AUTO policy. - // We have to deregister it, as it will be registered with ClientCursor. - curExec->deregisterExec(); - - // Need to save state while yielding locks between now and getMore(). - curExec->saveState(); - - execs.push_back(curExec.release()); + execs.push_back(std::move(statusWithPlanExecutor.getValue())); } // transfer iterators to executors using a round-robin distribution. // TODO consider using a common work queue once invalidation issues go away. for (size_t i = 0; i < iterators.size(); i++) { - PlanExecutor* theExec = execs[i % execs.size()]; - MultiIteratorStage* mis = static_cast<MultiIteratorStage*>(theExec->getRootStage()); - - // This wasn't called above as they weren't assigned yet - iterators[i]->savePositioned(); - + auto& planExec = execs[i % execs.size()]; + MultiIteratorStage* mis = checked_cast<MultiIteratorStage*>(planExec->getRootStage()); mis->addIterator(std::move(iterators[i])); } { BSONArrayBuilder bucketsBuilder; - for (size_t i = 0; i < execs.size(); i++) { + for (auto&& exec : execs) { + // The PlanExecutor was registered on construction due to the YIELD_AUTO policy. + // We have to deregister it, as it will be registered with ClientCursor. + exec->deregisterExec(); + + // Need to save state while yielding locks between now and getMore(). + exec->saveState(); + exec->detachFromOperationContext(); + // transfer ownership of an executor to the ClientCursor (which manages its own // lifetime). ClientCursor* cc = - new ClientCursor(collection->getCursorManager(), execs.releaseAt(i), ns.ns()); - - if (cmdObj["$readMajorityTemporaryName"].trueValue()) { - // Need to make RecoveryUnits for each cursor so that the getMores know to - // use readMajority. This will need to be replaced with a setting on the - // client cursor once we resolve SERVER-17364. - StorageEngine* storageEngine = - getGlobalServiceContext()->getGlobalStorageEngine(); - std::unique_ptr<RecoveryUnit> newRu(storageEngine->newRecoveryUnit()); - // Wouldn't have entered run() if not supported. - invariantOK(newRu->setReadFromMajorityCommittedSnapshot()); - - cc->setOwnedRecoveryUnit(newRu.release()); - } + new ClientCursor(collection->getCursorManager(), + exec.release(), + ns.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()); BSONObjBuilder threadResult; appendCursorResponseObject(cc->cursorid(), ns.ns(), BSONArray(), &threadResult); diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index 43e9d488427..9d971447dd9 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -130,22 +130,10 @@ static bool handleCursorCommand(OperationContext* txn, CurOp::get(txn)->debug().cursorid = cursor->cursorid(); - if (txn->getClient()->isInDirectClient()) { - cursor->setUnownedRecoveryUnit(txn->recoveryUnit()); - } else { - // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent - // getMore requests. The calling OpCtx gets a fresh RecoveryUnit. - txn->recoveryUnit()->abandonSnapshot(); - cursor->setOwnedRecoveryUnit(txn->releaseRecoveryUnit()); - StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); - invariant(txn->setRecoveryUnit(storageEngine->newRecoveryUnit(), - OperationContext::kNotInUnitOfWork) == - OperationContext::kNotInUnitOfWork); - } - // Cursor needs to be in a saved state while we yield locks for getmore. State // will be restored in getMore(). exec->saveState(); + exec->detachFromOperationContext(); } const long long cursorId = cursor ? cursor->cursorid() : 0LL; @@ -263,12 +251,14 @@ public: if (collection) { const bool isAggCursor = true; // enable special locking behavior - ClientCursor* cursor = new ClientCursor(collection->getCursorManager(), - exec.release(), - nss.ns(), - 0, - cmdObj.getOwned(), - isAggCursor); + ClientCursor* cursor = + new ClientCursor(collection->getCursorManager(), + exec.release(), + nss.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + 0, + cmdObj.getOwned(), + isAggCursor); pin.reset(new ClientCursorPin(collection->getCursorManager(), cursor->cursorid())); // Don't add any code between here and the start of the try block. } diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp index eef18cfcfeb..5f2f88c04ca 100644 --- a/src/mongo/db/commands/repair_cursor.cpp +++ b/src/mongo/db/commands/repair_cursor.cpp @@ -103,11 +103,15 @@ public: // it now so that it can be registed with ClientCursor. exec->deregisterExec(); exec->saveState(); + exec->detachFromOperationContext(); // ClientCursors' constructor inserts them into a global map that manages their // lifetimes. That is why the next line isn't leaky. ClientCursor* cc = - new ClientCursor(collection->getCursorManager(), exec.release(), ns.ns()); + new ClientCursor(collection->getCursorManager(), + exec.release(), + ns.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot()); appendCursorResponseObject(cc->cursorid(), ns.ns(), BSONArray(), &result); diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 8c219052961..e65f249a66f 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -640,7 +640,7 @@ public: } // Have the lock again. See if we were killed. - if (!exec->restoreState(txn)) { + if (!exec->restoreState()) { if (!partialOk) { uasserted(13281, "File deleted during filemd5 command"); } diff --git a/src/mongo/db/exec/cached_plan.cpp b/src/mongo/db/exec/cached_plan.cpp index 7ad6ef16a6e..dd0b751c326 100644 --- a/src/mongo/db/exec/cached_plan.cpp +++ b/src/mongo/db/exec/cached_plan.cpp @@ -292,7 +292,7 @@ PlanStage::StageState CachedPlanStage::work(WorkingSetID* out) { } -void CachedPlanStage::doRestoreState(OperationContext* opCtx) { +void CachedPlanStage::doReattachToOperationContext(OperationContext* opCtx) { _txn = opCtx; } diff --git a/src/mongo/db/exec/cached_plan.h b/src/mongo/db/exec/cached_plan.h index 117e6aa5dfa..7cf4a1ebc88 100644 --- a/src/mongo/db/exec/cached_plan.h +++ b/src/mongo/db/exec/cached_plan.h @@ -65,7 +65,7 @@ public: virtual StageState work(WorkingSetID* out); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); virtual StageType stageType() const { diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 9eee74c4451..ef5fb5f9fee 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -215,23 +215,33 @@ void CollectionScan::doInvalidate(OperationContext* txn, } void CollectionScan::doSaveState() { - _txn = NULL; if (_cursor) { _cursor->savePositioned(); } } -void CollectionScan::doRestoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; +void CollectionScan::doRestoreState() { if (_cursor) { - if (!_cursor->restore(opCtx)) { - warning() << "Could not restore RecordCursor for CollectionScan: " << opCtx->getNS(); + if (!_cursor->restore()) { + warning() << "Could not restore RecordCursor for CollectionScan: " << _txn->getNS(); _isDead = true; } } } +void CollectionScan::doDetachFromOperationContext() { + _txn = NULL; + if (_cursor) + _cursor->detachFromOperationContext(); +} + +void CollectionScan::doReattachToOperationContext(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + if (_cursor) + _cursor->reattachToOperationContext(opCtx); +} + unique_ptr<PlanStageStats> CollectionScan::getStats() { // Add a BSON representation of the filter to the stats tree, if there is one. if (NULL != _filter) { diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index 856af5e10ad..e08d583a7a3 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -59,7 +59,9 @@ public: virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); virtual void doSaveState(); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doRestoreState(); + virtual void doDetachFromOperationContext(); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual StageType stageType() const { return STAGE_COLLSCAN; diff --git a/src/mongo/db/exec/count.cpp b/src/mongo/db/exec/count.cpp index bcc389a488b..bb21de3cf81 100644 --- a/src/mongo/db/exec/count.cpp +++ b/src/mongo/db/exec/count.cpp @@ -163,7 +163,7 @@ PlanStage::StageState CountStage::work(WorkingSetID* out) { return PlanStage::NEED_TIME; } -void CountStage::doRestoreState(OperationContext* opCtx) { +void CountStage::doReattachToOperationContext(OperationContext* opCtx) { _txn = opCtx; } diff --git a/src/mongo/db/exec/count.h b/src/mongo/db/exec/count.h index ccb4a4b364a..110c9923119 100644 --- a/src/mongo/db/exec/count.h +++ b/src/mongo/db/exec/count.h @@ -56,7 +56,7 @@ public: virtual bool isEOF(); virtual StageState work(WorkingSetID* out); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual StageType stageType() const { return STAGE_COUNT; diff --git a/src/mongo/db/exec/count_scan.cpp b/src/mongo/db/exec/count_scan.cpp index e6a9f4b667c..6ba9d710f5d 100644 --- a/src/mongo/db/exec/count_scan.cpp +++ b/src/mongo/db/exec/count_scan.cpp @@ -121,23 +121,32 @@ bool CountScan::isEOF() { } void CountScan::doSaveState() { - _txn = NULL; if (_cursor) _cursor->savePositioned(); } -void CountScan::doRestoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; - +void CountScan::doRestoreState() { if (_cursor) - _cursor->restore(opCtx); + _cursor->restore(); // This can change during yielding. // TODO this isn't sufficient. See SERVER-17678. _shouldDedup = _descriptor->isMultikey(_txn); } +void CountScan::doDetachFromOperationContext() { + _txn = NULL; + if (_cursor) + _cursor->detachFromOperationContext(); +} + +void CountScan::doReattachToOperationContext(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + if (_cursor) + _cursor->reattachToOperationContext(opCtx); +} + void CountScan::doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { // The only state we're responsible for holding is what RecordIds to drop. If a document // mutates the underlying index cursor will deal with it. diff --git a/src/mongo/db/exec/count_scan.h b/src/mongo/db/exec/count_scan.h index ffb0035417d..dbbd85c5715 100644 --- a/src/mongo/db/exec/count_scan.h +++ b/src/mongo/db/exec/count_scan.h @@ -71,7 +71,9 @@ public: virtual StageState work(WorkingSetID* out); virtual bool isEOF(); virtual void doSaveState(); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doRestoreState(); + virtual void doDetachFromOperationContext(); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); virtual StageType stageType() const { diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp index bdc78806521..4a9f6638c2d 100644 --- a/src/mongo/db/exec/delete.cpp +++ b/src/mongo/db/exec/delete.cpp @@ -207,7 +207,7 @@ PlanStage::StageState DeleteStage::work(WorkingSetID* out) { // transaction in which they are created, and a WriteUnitOfWork is a // transaction, make sure to restore the state outside of the WritUnitOfWork. try { - child()->restoreState(_txn); + child()->restoreState(); } catch (const WriteConflictException& wce) { // Note we don't need to retry anything in this case since the delete already // was committed. However, we still need to return the deleted document @@ -257,8 +257,7 @@ PlanStage::StageState DeleteStage::work(WorkingSetID* out) { return status; } -void DeleteStage::doRestoreState(OperationContext* opCtx) { - _txn = opCtx; +void DeleteStage::doRestoreState() { const NamespaceString& ns(_collection->ns()); massert(28537, str::stream() << "Demoted from primary while removing from " << ns.ns(), @@ -266,6 +265,10 @@ void DeleteStage::doRestoreState(OperationContext* opCtx) { repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(ns)); } +void DeleteStage::doReattachToOperationContext(OperationContext* opCtx) { + _txn = opCtx; +} + unique_ptr<PlanStageStats> DeleteStage::getStats() { _commonStats.isEOF = isEOF(); unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(_commonStats, STAGE_DELETE); diff --git a/src/mongo/db/exec/delete.h b/src/mongo/db/exec/delete.h index 76f6afdb123..1b85a37474d 100644 --- a/src/mongo/db/exec/delete.h +++ b/src/mongo/db/exec/delete.h @@ -89,7 +89,8 @@ public: virtual bool isEOF(); virtual StageState work(WorkingSetID* out); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doRestoreState(); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual StageType stageType() const { return STAGE_DELETE; diff --git a/src/mongo/db/exec/distinct_scan.cpp b/src/mongo/db/exec/distinct_scan.cpp index d838177b456..f663bf99cee 100644 --- a/src/mongo/db/exec/distinct_scan.cpp +++ b/src/mongo/db/exec/distinct_scan.cpp @@ -129,19 +129,27 @@ bool DistinctScan::isEOF() { } void DistinctScan::doSaveState() { - _txn = NULL; - // We always seek, so we don't care where the cursor is. if (_cursor) _cursor->saveUnpositioned(); } -void DistinctScan::doRestoreState(OperationContext* opCtx) { +void DistinctScan::doRestoreState() { + if (_cursor) + _cursor->restore(); +} + +void DistinctScan::doDetachFromOperationContext() { + _txn = NULL; + if (_cursor) + _cursor->detachFromOperationContext(); +} + +void DistinctScan::doReattachToOperationContext(OperationContext* opCtx) { invariant(_txn == NULL); _txn = opCtx; - if (_cursor) - _cursor->restore(opCtx); + _cursor->reattachToOperationContext(opCtx); } unique_ptr<PlanStageStats> DistinctScan::getStats() { diff --git a/src/mongo/db/exec/distinct_scan.h b/src/mongo/db/exec/distinct_scan.h index b39ab6e9acc..36b20477a24 100644 --- a/src/mongo/db/exec/distinct_scan.h +++ b/src/mongo/db/exec/distinct_scan.h @@ -80,7 +80,9 @@ public: virtual StageState work(WorkingSetID* out); virtual bool isEOF(); virtual void doSaveState(); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doRestoreState(); + virtual void doDetachFromOperationContext(); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual StageType stageType() const { return STAGE_DISTINCT_SCAN; diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp index c31d9b3f45f..85c404c17e6 100644 --- a/src/mongo/db/exec/fetch.cpp +++ b/src/mongo/db/exec/fetch.cpp @@ -160,16 +160,26 @@ PlanStage::StageState FetchStage::work(WorkingSetID* out) { } void FetchStage::doSaveState() { - _txn = NULL; if (_cursor) _cursor->saveUnpositioned(); } -void FetchStage::doRestoreState(OperationContext* opCtx) { +void FetchStage::doRestoreState() { + if (_cursor) + _cursor->restore(); +} + +void FetchStage::doDetachFromOperationContext() { + _txn = NULL; + if (_cursor) + _cursor->detachFromOperationContext(); +} + +void FetchStage::doReattachToOperationContext(OperationContext* opCtx) { invariant(_txn == NULL); _txn = opCtx; if (_cursor) - _cursor->restore(opCtx); + _cursor->reattachToOperationContext(opCtx); } void FetchStage::doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/fetch.h b/src/mongo/db/exec/fetch.h index edfd6c35a5f..4e1bffcdeda 100644 --- a/src/mongo/db/exec/fetch.h +++ b/src/mongo/db/exec/fetch.h @@ -61,7 +61,9 @@ public: virtual StageState work(WorkingSetID* out); virtual void doSaveState(); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doRestoreState(); + virtual void doDetachFromOperationContext(); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); virtual StageType stageType() const { diff --git a/src/mongo/db/exec/group.cpp b/src/mongo/db/exec/group.cpp index a3049a1f787..891fc232796 100644 --- a/src/mongo/db/exec/group.cpp +++ b/src/mongo/db/exec/group.cpp @@ -257,7 +257,7 @@ bool GroupStage::isEOF() { return _groupState == GroupState_Done; } -void GroupStage::doRestoreState(OperationContext* opCtx) { +void GroupStage::doReattachToOperationContext(OperationContext* opCtx) { _txn = opCtx; } diff --git a/src/mongo/db/exec/group.h b/src/mongo/db/exec/group.h index 7f282fce25c..97fb39d27e7 100644 --- a/src/mongo/db/exec/group.h +++ b/src/mongo/db/exec/group.h @@ -90,7 +90,7 @@ public: virtual StageState work(WorkingSetID* out); virtual bool isEOF(); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual StageType stageType() const { return STAGE_GROUP; diff --git a/src/mongo/db/exec/idhack.cpp b/src/mongo/db/exec/idhack.cpp index 2d9a67cfa4f..afc7507a54f 100644 --- a/src/mongo/db/exec/idhack.cpp +++ b/src/mongo/db/exec/idhack.cpp @@ -200,16 +200,26 @@ PlanStage::StageState IDHackStage::advance(WorkingSetID id, } void IDHackStage::doSaveState() { - _txn = NULL; if (_recordCursor) _recordCursor->saveUnpositioned(); } -void IDHackStage::doRestoreState(OperationContext* opCtx) { +void IDHackStage::doRestoreState() { + if (_recordCursor) + _recordCursor->restore(); +} + +void IDHackStage::doDetachFromOperationContext() { + _txn = NULL; + if (_recordCursor) + _recordCursor->detachFromOperationContext(); +} + +void IDHackStage::doReattachToOperationContext(OperationContext* opCtx) { invariant(_txn == NULL); _txn = opCtx; if (_recordCursor) - _recordCursor->restore(opCtx); + _recordCursor->reattachToOperationContext(opCtx); } void IDHackStage::doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/idhack.h b/src/mongo/db/exec/idhack.h index 52446c064e2..19b12cbbc09 100644 --- a/src/mongo/db/exec/idhack.h +++ b/src/mongo/db/exec/idhack.h @@ -59,7 +59,9 @@ public: virtual StageState work(WorkingSetID* out); virtual void doSaveState(); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doRestoreState(); + virtual void doDetachFromOperationContext(); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); /** diff --git a/src/mongo/db/exec/index_scan.cpp b/src/mongo/db/exec/index_scan.cpp index 34f93786b6f..32dfb4c8121 100644 --- a/src/mongo/db/exec/index_scan.cpp +++ b/src/mongo/db/exec/index_scan.cpp @@ -234,12 +234,6 @@ bool IndexScan::isEOF() { } void IndexScan::doSaveState() { - if (!_txn) { - // We were already saved. Nothing to do. - return; - } - _txn = NULL; - if (!_indexCursor) return; @@ -251,11 +245,22 @@ void IndexScan::doSaveState() { _indexCursor->savePositioned(); } -void IndexScan::doRestoreState(OperationContext* opCtx) { +void IndexScan::doRestoreState() { + if (_indexCursor) + _indexCursor->restore(); +} + +void IndexScan::doDetachFromOperationContext() { + _txn = NULL; + if (_indexCursor) + _indexCursor->detachFromOperationContext(); +} + +void IndexScan::doReattachToOperationContext(OperationContext* opCtx) { invariant(_txn == NULL); _txn = opCtx; if (_indexCursor) - _indexCursor->restore(opCtx); + _indexCursor->reattachToOperationContext(opCtx); } void IndexScan::doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/index_scan.h b/src/mongo/db/exec/index_scan.h index d1a721588ac..5ae2319800c 100644 --- a/src/mongo/db/exec/index_scan.h +++ b/src/mongo/db/exec/index_scan.h @@ -100,7 +100,9 @@ public: virtual StageState work(WorkingSetID* out); virtual bool isEOF(); virtual void doSaveState(); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doRestoreState(); + virtual void doDetachFromOperationContext(); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); virtual StageType stageType() const { diff --git a/src/mongo/db/exec/multi_iterator.cpp b/src/mongo/db/exec/multi_iterator.cpp index f3de28c5e54..f08052e06f3 100644 --- a/src/mongo/db/exec/multi_iterator.cpp +++ b/src/mongo/db/exec/multi_iterator.cpp @@ -107,19 +107,31 @@ void MultiIteratorStage::kill() { } void MultiIteratorStage::doSaveState() { + for (auto&& iterator : _iterators) { + iterator->savePositioned(); + } +} + +void MultiIteratorStage::doRestoreState() { + for (auto&& iterator : _iterators) { + if (!iterator->restore()) { + kill(); + } + } +} + +void MultiIteratorStage::doDetachFromOperationContext() { _txn = NULL; - for (size_t i = 0; i < _iterators.size(); i++) { - _iterators[i]->savePositioned(); + for (auto&& iterator : _iterators) { + iterator->detachFromOperationContext(); } } -void MultiIteratorStage::doRestoreState(OperationContext* opCtx) { +void MultiIteratorStage::doReattachToOperationContext(OperationContext* opCtx) { invariant(_txn == NULL); _txn = opCtx; - for (size_t i = 0; i < _iterators.size(); i++) { - if (!_iterators[i]->restore(opCtx)) { - kill(); - } + for (auto&& iterator : _iterators) { + iterator->reattachToOperationContext(opCtx); } } diff --git a/src/mongo/db/exec/multi_iterator.h b/src/mongo/db/exec/multi_iterator.h index a0b16d57515..9bcbdb3c1c1 100644 --- a/src/mongo/db/exec/multi_iterator.h +++ b/src/mongo/db/exec/multi_iterator.h @@ -60,7 +60,9 @@ public: void kill(); virtual void doSaveState(); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doRestoreState(); + virtual void doDetachFromOperationContext(); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); // Returns empty PlanStageStats object diff --git a/src/mongo/db/exec/multi_plan.cpp b/src/mongo/db/exec/multi_plan.cpp index de58312a4b6..18188ab6b68 100644 --- a/src/mongo/db/exec/multi_plan.cpp +++ b/src/mongo/db/exec/multi_plan.cpp @@ -388,7 +388,7 @@ bool MultiPlanStage::workAllPlans(size_t numResults, PlanYieldPolicy* yieldPolic return !doneWorking; } -void MultiPlanStage::doRestoreState(OperationContext* opCtx) { +void MultiPlanStage::doReattachToOperationContext(OperationContext* opCtx) { _txn = opCtx; } diff --git a/src/mongo/db/exec/multi_plan.h b/src/mongo/db/exec/multi_plan.h index 669197f648e..30b5f5f9bbf 100644 --- a/src/mongo/db/exec/multi_plan.h +++ b/src/mongo/db/exec/multi_plan.h @@ -66,7 +66,7 @@ public: virtual StageState work(WorkingSetID* out); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); diff --git a/src/mongo/db/exec/near.cpp b/src/mongo/db/exec/near.cpp index a79ae985bd5..ceeda89c192 100644 --- a/src/mongo/db/exec/near.cpp +++ b/src/mongo/db/exec/near.cpp @@ -284,7 +284,7 @@ bool NearStage::isEOF() { return SearchState_Finished == _searchState; } -void NearStage::doRestoreState(OperationContext* opCtx) { +void NearStage::doReattachToOperationContext(OperationContext* opCtx) { _txn = opCtx; } diff --git a/src/mongo/db/exec/near.h b/src/mongo/db/exec/near.h index 09a743ae147..57773e42b7c 100644 --- a/src/mongo/db/exec/near.h +++ b/src/mongo/db/exec/near.h @@ -82,7 +82,7 @@ public: virtual bool isEOF(); virtual StageState work(WorkingSetID* out); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); virtual StageType stageType() const; diff --git a/src/mongo/db/exec/oplogstart.cpp b/src/mongo/db/exec/oplogstart.cpp index 33b41aed43d..f43272caf47 100644 --- a/src/mongo/db/exec/oplogstart.cpp +++ b/src/mongo/db/exec/oplogstart.cpp @@ -181,18 +181,14 @@ void OplogStart::doInvalidate(OperationContext* txn, const RecordId& dl, Invalid } void OplogStart::doSaveState() { - _txn = NULL; for (size_t i = 0; i < _subIterators.size(); i++) { _subIterators[i]->savePositioned(); } } -void OplogStart::doRestoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; - +void OplogStart::doRestoreState() { for (size_t i = 0; i < _subIterators.size(); i++) { - if (!_subIterators[i]->restore(opCtx)) { + if (!_subIterators[i]->restore()) { _subIterators.erase(_subIterators.begin() + i); // need to hit same i on next pass through loop i--; @@ -200,6 +196,21 @@ void OplogStart::doRestoreState(OperationContext* opCtx) { } } +void OplogStart::doDetachFromOperationContext() { + _txn = NULL; + for (auto&& iterator : _subIterators) { + iterator->detachFromOperationContext(); + } +} + +void OplogStart::doReattachToOperationContext(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + for (auto&& iterator : _subIterators) { + iterator->reattachToOperationContext(opCtx); + } +} + unique_ptr<PlanStageStats> OplogStart::getStats() { unique_ptr<PlanStageStats> ret = make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_OPLOG_START); diff --git a/src/mongo/db/exec/oplogstart.h b/src/mongo/db/exec/oplogstart.h index d26d8d63b4e..14db9e9b585 100644 --- a/src/mongo/db/exec/oplogstart.h +++ b/src/mongo/db/exec/oplogstart.h @@ -71,7 +71,9 @@ public: virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); virtual void doSaveState(); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doRestoreState(); + virtual void doDetachFromOperationContext(); + virtual void doReattachToOperationContext(OperationContext* opCtx); // Returns empty PlanStageStats object virtual std::unique_ptr<PlanStageStats> getStats(); diff --git a/src/mongo/db/exec/pipeline_proxy.cpp b/src/mongo/db/exec/pipeline_proxy.cpp index 5c6fa17b251..36e2eff10b6 100644 --- a/src/mongo/db/exec/pipeline_proxy.cpp +++ b/src/mongo/db/exec/pipeline_proxy.cpp @@ -100,13 +100,19 @@ void PipelineProxyStage::doInvalidate(OperationContext* txn, } } -void PipelineProxyStage::doSaveState() { +void PipelineProxyStage::doDetachFromOperationContext() { _pipeline->getContext()->opCtx = NULL; + if (auto child = getChildExecutor()) { + child->detachFromOperationContext(); + } } -void PipelineProxyStage::doRestoreState(OperationContext* opCtx) { +void PipelineProxyStage::doReattachToOperationContext(OperationContext* opCtx) { invariant(_pipeline->getContext()->opCtx == NULL); _pipeline->getContext()->opCtx = opCtx; + if (auto child = getChildExecutor()) { + child->reattachToOperationContext(opCtx); + } } void PipelineProxyStage::pushBack(const BSONObj& obj) { diff --git a/src/mongo/db/exec/pipeline_proxy.h b/src/mongo/db/exec/pipeline_proxy.h index 517cf6ef393..84f1a867f2e 100644 --- a/src/mongo/db/exec/pipeline_proxy.h +++ b/src/mongo/db/exec/pipeline_proxy.h @@ -55,11 +55,10 @@ public: virtual void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type); // - // Manage our OperationContext. We intentionally don't propagate to the child - // Runner as that is handled by DocumentSourceCursor as it needs to. + // Manage our OperationContext. // - virtual void doSaveState(); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doDetachFromOperationContext(); + virtual void doReattachToOperationContext(OperationContext* opCtx); /** * Make obj the next object returned by getNext(). diff --git a/src/mongo/db/exec/plan_stage.cpp b/src/mongo/db/exec/plan_stage.cpp index 5c0f9c95472..13303e94acd 100644 --- a/src/mongo/db/exec/plan_stage.cpp +++ b/src/mongo/db/exec/plan_stage.cpp @@ -43,13 +43,13 @@ void PlanStage::saveState() { doSaveState(); } -void PlanStage::restoreState(OperationContext* opCtx) { +void PlanStage::restoreState() { ++_commonStats.unyields; for (auto&& child : _children) { - child->restoreState(opCtx); + child->restoreState(); } - doRestoreState(opCtx); + doRestoreState(); } void PlanStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { @@ -61,4 +61,20 @@ void PlanStage::invalidate(OperationContext* txn, const RecordId& dl, Invalidati doInvalidate(txn, dl, type); } +void PlanStage::detachFromOperationContext() { + for (auto&& child : _children) { + child->detachFromOperationContext(); + } + + doDetachFromOperationContext(); +} + +void PlanStage::reattachToOperationContext(OperationContext* opCtx) { + for (auto&& child : _children) { + child->reattachToOperationContext(opCtx); + } + + doReattachToOperationContext(opCtx); +} + } // namespace mongo diff --git a/src/mongo/db/exec/plan_stage.h b/src/mongo/db/exec/plan_stage.h index 85884f47f9c..cd552144def 100644 --- a/src/mongo/db/exec/plan_stage.h +++ b/src/mongo/db/exec/plan_stage.h @@ -221,7 +221,28 @@ public: * * Propagates to all children, then calls doRestoreState(). */ - void restoreState(OperationContext* opCtx); + void restoreState(); + + /** + * Detaches from the OperationContext and releases any storage-engine state. + * + * It is only legal to call this when in a "saved" state. While in the "detached" state, it is + * only legal to call reattachToOperationContext or the destructor. It is not legal to call + * detachFromOperationContext() while already in the detached state. + * + * Propagates to all children, then calls doDetachFromOperationContext(). + */ + void detachFromOperationContext(); + + /** + * Reattaches to the OperationContext and reacquires any storage-engine state. + * + * It is only legal to call this in the "detached" state. On return, the cursor is left in a + * "saved" state, so callers must still call restoreState to use this object. + * + * Propagates to all children, then calls doReattachToOperationContext(). + */ + void reattachToOperationContext(OperationContext* opCtx); /** * Notifies a stage that a RecordId is going to be deleted (or in-place updated) so that the @@ -297,11 +318,22 @@ protected: /** * Restores any stage-specific saved state and prepares to handle calls to work(). + */ + virtual void doRestoreState() {} + + /** + * Does stage-specific detaching. + */ + virtual void doDetachFromOperationContext() {} + + /** + * Does stage-specific attaching. * * If the stage needs an OperationContext during its execution, it may keep a handle to the - * provided OperationContext (which is valid until the next call to saveState()). + * provided OperationContext (which is valid until the next call to + * doDetachFromOperationContext()). */ - virtual void doRestoreState(OperationContext* txn) {} + virtual void doReattachToOperationContext(OperationContext* opCtx) {} /** * Does the stage-specific invalidation work. diff --git a/src/mongo/db/exec/queued_data_stage_test.cpp b/src/mongo/db/exec/queued_data_stage_test.cpp index 3b2e89f2577..c3aa0e88d63 100644 --- a/src/mongo/db/exec/queued_data_stage_test.cpp +++ b/src/mongo/db/exec/queued_data_stage_test.cpp @@ -91,7 +91,7 @@ TEST(QueuedDataStageTest, validateStats) { ASSERT_EQUALS(stats->yields, 1U); // unyields - mock->restoreState(NULL); + mock->restoreState(); ASSERT_EQUALS(stats->unyields, 1U); // invalidates diff --git a/src/mongo/db/exec/subplan.cpp b/src/mongo/db/exec/subplan.cpp index bb9b0abb14f..7c6e5584d13 100644 --- a/src/mongo/db/exec/subplan.cpp +++ b/src/mongo/db/exec/subplan.cpp @@ -530,7 +530,7 @@ PlanStage::StageState SubplanStage::work(WorkingSetID* out) { return state; } -void SubplanStage::doRestoreState(OperationContext* opCtx) { +void SubplanStage::doReattachToOperationContext(OperationContext* opCtx) { _txn = opCtx; } diff --git a/src/mongo/db/exec/subplan.h b/src/mongo/db/exec/subplan.h index d1a54db9d34..f85ce59b542 100644 --- a/src/mongo/db/exec/subplan.h +++ b/src/mongo/db/exec/subplan.h @@ -77,7 +77,7 @@ public: virtual bool isEOF(); virtual StageState work(WorkingSetID* out); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual StageType stageType() const { return STAGE_SUBPLAN; diff --git a/src/mongo/db/exec/text_or.cpp b/src/mongo/db/exec/text_or.cpp index d1e57871593..41c57530a19 100644 --- a/src/mongo/db/exec/text_or.cpp +++ b/src/mongo/db/exec/text_or.cpp @@ -79,20 +79,30 @@ bool TextOrStage::isEOF() { } void TextOrStage::doSaveState() { - _txn = NULL; if (_recordCursor) { _recordCursor->saveUnpositioned(); } } -void TextOrStage::doRestoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; +void TextOrStage::doRestoreState() { if (_recordCursor) { - invariant(_recordCursor->restore(opCtx)); + invariant(_recordCursor->restore()); } } +void TextOrStage::doDetachFromOperationContext() { + _txn = NULL; + if (_recordCursor) + _recordCursor->detachFromOperationContext(); +} + +void TextOrStage::doReattachToOperationContext(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + if (_recordCursor) + _recordCursor->reattachToOperationContext(opCtx); +} + void TextOrStage::doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { // Remove the RecordID from the ScoreMap. ScoreMap::iterator scoreIt = _scores.find(dl); diff --git a/src/mongo/db/exec/text_or.h b/src/mongo/db/exec/text_or.h index a51a91a3919..c2018bda166 100644 --- a/src/mongo/db/exec/text_or.h +++ b/src/mongo/db/exec/text_or.h @@ -87,7 +87,9 @@ public: StageState work(WorkingSetID* out) final; void doSaveState() final; - void doRestoreState(OperationContext* opCtx) final; + void doRestoreState() final; + void doDetachFromOperationContext() final; + void doReattachToOperationContext(OperationContext* opCtx) final; void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) final; StageType stageType() const final { diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index 970137bc6e9..a5147ed6e3b 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -902,7 +902,7 @@ PlanStage::StageState UpdateStage::work(WorkingSetID* out) { // As restoreState may restore (recreate) cursors, make sure to restore the // state outside of the WritUnitOfWork. try { - child()->restoreState(_txn); + child()->restoreState(); } catch (const WriteConflictException& wce) { // Note we don't need to retry updating anything in this case since the update // already was committed. However, we still need to return the updated document @@ -958,12 +958,12 @@ PlanStage::StageState UpdateStage::work(WorkingSetID* out) { return status; } -Status UpdateStage::restoreUpdateState(OperationContext* opCtx) { +Status UpdateStage::restoreUpdateState() { const UpdateRequest& request = *_params.request; const NamespaceString& nsString(request.getNamespaceString()); // We may have stepped down during the yield. - bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && + bool userInitiatedWritesAndNotPrimary = _txn->writesAreReplicated() && !repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nsString); if (userInitiatedWritesAndNotPrimary) { @@ -982,17 +982,19 @@ Status UpdateStage::restoreUpdateState(OperationContext* opCtx) { 17270); } - _params.driver->refreshIndexKeys(lifecycle->getIndexKeys(opCtx)); + _params.driver->refreshIndexKeys(lifecycle->getIndexKeys(_txn)); } return Status::OK(); } -void UpdateStage::doRestoreState(OperationContext* opCtx) { - _txn = opCtx; - uassertStatusOK(restoreUpdateState(opCtx)); +void UpdateStage::doRestoreState() { + uassertStatusOK(restoreUpdateState()); } +void UpdateStage::doReattachToOperationContext(OperationContext* opCtx) { + _txn = opCtx; +} unique_ptr<PlanStageStats> UpdateStage::getStats() { _commonStats.isEOF = isEOF(); diff --git a/src/mongo/db/exec/update.h b/src/mongo/db/exec/update.h index 4d59dca50f2..c728bda5392 100644 --- a/src/mongo/db/exec/update.h +++ b/src/mongo/db/exec/update.h @@ -84,7 +84,8 @@ public: virtual bool isEOF(); virtual StageState work(WorkingSetID* out); - virtual void doRestoreState(OperationContext* opCtx); + virtual void doRestoreState(); + virtual void doReattachToOperationContext(OperationContext* opCtx); virtual StageType stageType() const { return STAGE_UPDATE; @@ -163,7 +164,7 @@ private: /** * Helper for restoring the state of this update. */ - Status restoreUpdateState(OperationContext* opCtx); + Status restoreUpdateState(); // Transactional context. Not owned by us. OperationContext* _txn; diff --git a/src/mongo/db/operation_context_impl.cpp b/src/mongo/db/operation_context_impl.cpp index bd05e7712d8..f2c0166876f 100644 --- a/src/mongo/db/operation_context_impl.cpp +++ b/src/mongo/db/operation_context_impl.cpp @@ -99,8 +99,6 @@ RecoveryUnit* OperationContextImpl::recoveryUnit() const { } RecoveryUnit* OperationContextImpl::releaseRecoveryUnit() { - if (_recovery.get()) - _recovery->beingReleasedFromOperationContext(); return _recovery.release(); } @@ -109,8 +107,6 @@ OperationContext::RecoveryUnitState OperationContextImpl::setRecoveryUnit(Recove _recovery.reset(unit); RecoveryUnitState oldState = _ruState; _ruState = state; - if (unit) - unit->beingSetOnOperationContext(); return oldState; } diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 702852f53b2..12a1725f7fd 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -88,7 +88,7 @@ void DocumentSourceCursor::loadBatch() { const NamespaceString nss(_ns); AutoGetCollectionForRead autoColl(pExpCtx->opCtx, nss); - _exec->restoreState(pExpCtx->opCtx); + _exec->restoreState(); int memUsageBytes = 0; BSONObj obj; @@ -172,7 +172,7 @@ Value DocumentSourceCursor::serialize(bool explain) const { massert(17392, "No _exec. Were we disposed before explained?", _exec); - _exec->restoreState(pExpCtx->opCtx); + _exec->restoreState(); Explain::explainStages(_exec.get(), ExplainCommon::QUERY_PLANNER, &explainBuilder); _exec->saveState(); } diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 30c270eced1..5a65d218b2c 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -73,36 +73,6 @@ const int32_t MaxBytesToReturnToClientAtOnce = 4 * 1024 * 1024; // Failpoint for checking whether we've received a getmore. MONGO_FP_DECLARE(failReceivedGetmore); -ScopedRecoveryUnitSwapper::ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn) - : _cc(cc), _txn(txn), _dismissed(false) { - // Save this for later. We restore it upon destruction. - _txn->recoveryUnit()->abandonSnapshot(); - _txnPreviousRecoveryUnit.reset(txn->releaseRecoveryUnit()); - - // Transfer ownership of the RecoveryUnit from the ClientCursor to the OpCtx. - RecoveryUnit* ccRecoveryUnit = cc->releaseOwnedRecoveryUnit(); - _txnPreviousRecoveryUnitState = - txn->setRecoveryUnit(ccRecoveryUnit, OperationContext::kNotInUnitOfWork); -} - -void ScopedRecoveryUnitSwapper::dismiss() { - _dismissed = true; -} - -ScopedRecoveryUnitSwapper::~ScopedRecoveryUnitSwapper() { - _txn->recoveryUnit()->abandonSnapshot(); - - if (_dismissed) { - // Just clean up the recovery unit which we originally got from the ClientCursor. - delete _txn->releaseRecoveryUnit(); - } else { - // Swap the RU back into the ClientCursor for subsequent getMores. - _cc->setOwnedRecoveryUnit(_txn->releaseRecoveryUnit()); - } - - _txn->setRecoveryUnit(_txnPreviousRecoveryUnit.release(), _txnPreviousRecoveryUnitState); -} - /** * If ntoreturn is zero, we stop generating additional results as soon as we have either 101 * documents or at least 1MB of data. On subsequent getmores, there is no limit on the number @@ -359,16 +329,6 @@ QueryResult::View getMore(OperationContext* txn, // CC, so don't delete it. ClientCursorPin ccPin(cursorManager, cursorid); ClientCursor* cc = ccPin.c(); - - // If we're not being called from DBDirectClient we want to associate the RecoveryUnit - // used to create the execution machinery inside the cursor with our OperationContext. - // If we throw or otherwise exit this method in a disorderly fashion, we must ensure - // that further calls to getMore won't fail, and that the provided OperationContext - // has a valid RecoveryUnit. As such, we use RAII to accomplish this. - // - // This must be destroyed before the ClientCursor is destroyed. - unique_ptr<ScopedRecoveryUnitSwapper> ruSwapper; - // These are set in the QueryResult msg we return. int resultFlags = ResultFlag_AwaitCapable; @@ -392,19 +352,8 @@ QueryResult::View getMore(OperationContext* txn, ns == cc->ns()); *isCursorAuthorized = true; - // Restore the RecoveryUnit if we need to. - if (txn->getClient()->isInDirectClient()) { - if (cc->hasRecoveryUnit()) - invariant(txn->recoveryUnit() == cc->getUnownedRecoveryUnit()); - } else { - if (!cc->hasRecoveryUnit()) { - // Start using a new RecoveryUnit - cc->setOwnedRecoveryUnit( - getGlobalServiceContext()->getGlobalStorageEngine()->newRecoveryUnit()); - } - // Swap RecoveryUnit(s) between the ClientCursor and OperationContext. - ruSwapper = make_unique<ScopedRecoveryUnitSwapper>(cc, txn); - } + if (cc->isReadCommitted()) + uassertStatusOK(txn->recoveryUnit()->setReadFromMajorityCommittedSnapshot()); // Reset timeout timer on the cursor since the cursor is still in use. cc->setIdleTime(0); @@ -436,7 +385,8 @@ QueryResult::View getMore(OperationContext* txn, startingResult = cc->pos(); PlanExecutor* exec = cc->getExecutor(); - exec->restoreState(txn); + exec->reattachToOperationContext(txn); + exec->restoreState(); PlanExecutor::ExecState state; @@ -463,7 +413,7 @@ QueryResult::View getMore(OperationContext* txn, // Reacquiring locks. ctx = make_unique<AutoGetCollectionForRead>(txn, nss); - exec->restoreState(txn); + exec->restoreState(); // We woke up because either the timed_wait expired, or there was more data. Either // way, attempt to generate another batch of results. @@ -485,7 +435,6 @@ QueryResult::View getMore(OperationContext* txn, // pin. Because our ClientCursorPin is declared after our lock is declared, this // will happen under the lock. if (!shouldSaveCursorGetMore(state, exec, isCursorTailable(cc))) { - ruSwapper.reset(); ccPin.deleteUnderlying(); // cc is now invalid, as is the executor @@ -499,16 +448,10 @@ QueryResult::View getMore(OperationContext* txn, // Continue caching the ClientCursor. cc->incPos(numResults); exec->saveState(); + exec->detachFromOperationContext(); LOG(5) << "getMore saving client cursor ended with state " << PlanExecutor::statestr(state) << endl; - if (PlanExecutor::IS_EOF == state && isCursorTailable(cc)) { - if (!txn->getClient()->isInDirectClient()) { - // Don't stash the RU. Get a new one on the next getMore. - ruSwapper->dismiss(); - } - } - // Possibly note slave's position in the oplog. if ((cc->queryOptions() & QueryOption_OplogReplay) && !slaveReadTill.isNull()) { cc->slaveReadTill(slaveReadTill); @@ -698,32 +641,19 @@ std::string runQuery(OperationContext* txn, if (shouldSaveCursor(txn, collection, state, exec.get())) { // We won't use the executor until it's getMore'd. exec->saveState(); + exec->detachFromOperationContext(); // Allocate a new ClientCursor. We don't have to worry about leaking it as it's // inserted into a global map by its ctor. - ClientCursor* cc = new ClientCursor(collection->getCursorManager(), - exec.release(), - nss.ns(), - pq.getOptions(), - pq.getFilter()); + ClientCursor* cc = + new ClientCursor(collection->getCursorManager(), + exec.release(), + nss.ns(), + txn->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), + pq.getOptions(), + pq.getFilter()); ccId = cc->cursorid(); - if (txn->getClient()->isInDirectClient()) { - cc->setUnownedRecoveryUnit(txn->recoveryUnit()); - } else if (state == PlanExecutor::IS_EOF && pq.isTailable()) { - // Don't stash the RU for tailable cursors at EOF, let them get a new RU on their - // next getMore. - } else { - // We stash away the RecoveryUnit in the ClientCursor. It's used for subsequent - // getMore requests. The calling OpCtx gets a fresh RecoveryUnit. - txn->recoveryUnit()->abandonSnapshot(); - cc->setOwnedRecoveryUnit(txn->releaseRecoveryUnit()); - StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); - invariant(txn->setRecoveryUnit(storageEngine->newRecoveryUnit(), - OperationContext::kNotInUnitOfWork) == - OperationContext::kNotInUnitOfWork); - } - LOG(5) << "caching executor with cursorid " << ccId << " after returning " << numResults << " results" << endl; diff --git a/src/mongo/db/query/find.h b/src/mongo/db/query/find.h index 04675f44d7b..8afaf16b334 100644 --- a/src/mongo/db/query/find.h +++ b/src/mongo/db/query/find.h @@ -41,27 +41,6 @@ namespace mongo { class NamespaceString; class OperationContext; -class ScopedRecoveryUnitSwapper { -public: - ScopedRecoveryUnitSwapper(ClientCursor* cc, OperationContext* txn); - - ~ScopedRecoveryUnitSwapper(); - - /** - * Dismissing the RU swapper causes it to simply free the recovery unit rather than swapping - * it back into the ClientCursor. - */ - void dismiss(); - -private: - ClientCursor* _cc; - OperationContext* _txn; - bool _dismissed; - - std::unique_ptr<RecoveryUnit> _txnPreviousRecoveryUnit; - OperationContext::RecoveryUnitState _txnPreviousRecoveryUnitState; -}; - /** * Returns true if enough results have been prepared to stop adding more to the first batch. * diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index dfc97724b6f..f163e0637b1 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -178,6 +178,7 @@ PlanExecutor::PlanExecutor(OperationContext* opCtx, } Status PlanExecutor::pickBestPlan(YieldPolicy policy) { + invariant(_currentState == kUsable); // For YIELD_AUTO, this will both set an auto yield policy on the PlanExecutor and // register it to receive notifications. this->setYieldPolicy(policy); @@ -251,6 +252,7 @@ OperationContext* PlanExecutor::getOpCtx() const { } void PlanExecutor::saveState() { + invariant(_currentState == kUsable || _currentState == kSaved); if (!killed()) { _root->saveState(); } @@ -264,12 +266,12 @@ void PlanExecutor::saveState() { WorkingSetCommon::prepareForSnapshotChange(_workingSet.get()); } - _opCtx = NULL; + _currentState = kSaved; } -bool PlanExecutor::restoreState(OperationContext* opCtx) { +bool PlanExecutor::restoreState() { try { - return restoreStateWithoutRetrying(opCtx); + return restoreStateWithoutRetrying(); } catch (const WriteConflictException& wce) { if (!_yieldPolicy->allowedToYield()) throw; @@ -279,23 +281,35 @@ bool PlanExecutor::restoreState(OperationContext* opCtx) { } } -bool PlanExecutor::restoreStateWithoutRetrying(OperationContext* opCtx) { - invariant(NULL == _opCtx); - invariant(opCtx); - - _opCtx = opCtx; - +bool PlanExecutor::restoreStateWithoutRetrying() { + invariant(_currentState == kSaved); // We're restoring after a yield or getMore now. If we're a yielding plan executor, reset // the yield timer in order to prevent from yielding again right away. _yieldPolicy->resetTimer(); if (!killed()) { - _root->restoreState(opCtx); + _root->restoreState(); } + _currentState = kUsable; return !killed(); } +void PlanExecutor::detachFromOperationContext() { + invariant(_currentState == kSaved); + _opCtx = nullptr; + _root->detachFromOperationContext(); + _currentState = kDetached; + _everDetachedFromOperationContext = true; +} + +void PlanExecutor::reattachToOperationContext(OperationContext* txn) { + invariant(_currentState == kDetached); + _opCtx = txn; + _root->reattachToOperationContext(txn); + _currentState = kSaved; +} + void PlanExecutor::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { if (!killed()) { _root->invalidate(txn, dl, type); @@ -304,7 +318,7 @@ void PlanExecutor::invalidate(OperationContext* txn, const RecordId& dl, Invalid PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) { Snapshotted<BSONObj> snapshotted; - ExecState state = getNextSnapshotted(objOut ? &snapshotted : NULL, dlOut); + ExecState state = getNextImpl(objOut ? &snapshotted : NULL, dlOut); if (objOut) { *objOut = snapshotted.value(); @@ -315,6 +329,13 @@ PlanExecutor::ExecState PlanExecutor::getNext(BSONObj* objOut, RecordId* dlOut) PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { + // Detaching from the OperationContext means that the returned snapshot ids could be invalid. + invariant(!_everDetachedFromOperationContext); + return getNextImpl(objOut, dlOut); +} + +PlanExecutor::ExecState PlanExecutor::getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut) { + invariant(_currentState == kUsable); if (killed()) { if (NULL != objOut) { Status status(ErrorCodes::OperationFailed, @@ -453,6 +474,7 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o } bool PlanExecutor::isEOF() { + invariant(_currentState == kUsable); return killed() || (_stash.empty() && _root->isEOF()); } @@ -493,6 +515,7 @@ void PlanExecutor::kill(string reason) { } Status PlanExecutor::executePlan() { + invariant(_currentState == kUsable); BSONObj obj; PlanExecutor::ExecState state = PlanExecutor::ADVANCED; while (PlanExecutor::ADVANCED == state) { diff --git a/src/mongo/db/query/plan_executor.h b/src/mongo/db/query/plan_executor.h index 83e0c7eb6cd..ed21f8b047a 100644 --- a/src/mongo/db/query/plan_executor.h +++ b/src/mongo/db/query/plan_executor.h @@ -228,9 +228,10 @@ public: // /** - * Save any state required to either - * 1. hibernate waiting for a getMore, or - * 2. yield the lock (on applicable storage engines) to allow writes to proceed. + * Save any state required to recover from changes to the underlying collection's data. + * + * While in the "saved" state, it is only legal to call restoreState, + * detachFromOperationContext, or the destructor. */ void saveState(); @@ -244,7 +245,24 @@ public: * * Returns false otherwise. The execution tree cannot be worked and should be deleted. */ - bool restoreState(OperationContext* opCtx); + bool restoreState(); + + /** + * Detaches from the OperationContext and releases any storage-engine state. + * + * It is only legal to call this when in a "saved" state. While in the "detached" state, it is + * only legal to call reattachToOperationContext or the destructor. It is not legal to call + * detachFromOperationContext() while already in the detached state. + */ + void detachFromOperationContext(); + + /** + * Reattaches to the OperationContext and reacquires any storage-engine state. + * + * It is only legal to call this in the "detached" state. On return, the cursor is left in a + * "saved" state, so callers must still call restoreState to use this object. + */ + void reattachToOperationContext(OperationContext* opCtx); /** * Same as restoreState but without the logic to retry if a WriteConflictException is @@ -252,7 +270,7 @@ public: * * This is only public for PlanYieldPolicy. DO NOT CALL ANYWHERE ELSE. */ - bool restoreStateWithoutRetrying(OperationContext* opCtx); + bool restoreStateWithoutRetrying(); // // Running Support @@ -351,6 +369,8 @@ public: void enqueue(const BSONObj& obj); private: + ExecState getNextImpl(Snapshotted<BSONObj>* objOut, RecordId* dlOut); + /** * RAII approach to ensuring that plan executors are deregistered. * @@ -441,6 +461,10 @@ private: // to consume yet. We empty the queue before retrieving further results from the plan // stages. std::queue<BSONObj> _stash; + + enum { kUsable, kSaved, kDetached } _currentState = kUsable; + + bool _everDetachedFromOperationContext = false; }; } // namespace mongo diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp index 45d996893a9..da41e863db1 100644 --- a/src/mongo/db/query/plan_yield_policy.cpp +++ b/src/mongo/db/query/plan_yield_policy.cpp @@ -97,7 +97,7 @@ bool PlanYieldPolicy::yield(RecordFetcher* fetcher) { QueryYield::yieldAllLocks(opCtx, fetcher); } - return _planYielding->restoreStateWithoutRetrying(opCtx); + return _planYielding->restoreStateWithoutRetrying(); } catch (const WriteConflictException& wce) { CurOp::get(opCtx)->debug().writeConflicts++; WriteConflictException::logAndBackoff( diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp index a3de8953291..392c81a080c 100644 --- a/src/mongo/db/repair_database.cpp +++ b/src/mongo/db/repair_database.cpp @@ -143,7 +143,7 @@ Status rebuildIndexesOnCollection(OperationContext* txn, rs->deleteRecord(txn, id); wunit.commit(); } - cursor->restore(txn); + cursor->restore(); continue; } diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp index 25ebf6a5de6..25a61a9edb2 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp @@ -47,9 +47,11 @@ public: return {}; } void savePositioned() final {} - bool restore(OperationContext* txn) final { + bool restore() final { return true; } + void detachFromOperationContext() final {} + void reattachToOperationContext(OperationContext* txn) final {} }; class DevNullRecordStore : public RecordStore { diff --git a/src/mongo/db/storage/in_memory/in_memory_btree_impl.cpp b/src/mongo/db/storage/in_memory/in_memory_btree_impl.cpp index f40dff8e7ff..30cabb59716 100644 --- a/src/mongo/db/storage/in_memory/in_memory_btree_impl.cpp +++ b/src/mongo/db/storage/in_memory/in_memory_btree_impl.cpp @@ -299,14 +299,11 @@ public: } void saveUnpositioned() override { - _txn = nullptr; _savedAtEnd = true; // Doing nothing with end cursor since it will do full reseek on restore. } - void restore(OperationContext* txn) override { - _txn = txn; - + void restore() override { // Always do a full seek on restore. We cannot use our last position since index // entries may have been inserted closer to our endpoint and we would need to move // over them. @@ -324,6 +321,14 @@ public: || _data.value_comp().compare(*_it, {_savedKey, _savedLoc}) != 0; } + void detachFromOperationContext() final { + _txn = nullptr; + } + + void reattachToOperationContext(OperationContext* txn) final { + _txn = txn; + } + private: bool atEndPoint() const { return _endState && _it == _endState->it; diff --git a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp index af596f7a569..7c3b708d513 100644 --- a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp +++ b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp @@ -112,7 +112,7 @@ private: class InMemoryRecordStore::Cursor final : public RecordCursor { public: Cursor(OperationContext* txn, const InMemoryRecordStore& rs) - : _txn(txn), _records(rs._data->records), _isCapped(rs.isCapped()) {} + : _records(rs._data->records), _isCapped(rs.isCapped()) {} boost::optional<Record> next() final { if (_needFirstSeek) { @@ -138,18 +138,15 @@ public: } void savePositioned() final { - _txn = nullptr; if (!_needFirstSeek && !_lastMoveWasRestore) _savedId = _it == _records.end() ? RecordId() : _it->first; } void saveUnpositioned() final { - _txn = nullptr; _savedId = RecordId(); } - bool restore(OperationContext* txn) final { - _txn = txn; + bool restore() final { if (_savedId.isNull()) { _it = _records.end(); return true; @@ -162,8 +159,10 @@ public: return !(_isCapped && _lastMoveWasRestore); } + void detachFromOperationContext() final {} + void reattachToOperationContext(OperationContext* txn) final {} + private: - unowned_ptr<OperationContext> _txn; Records::const_iterator _it; bool _needFirstSeek = true; bool _lastMoveWasRestore = false; @@ -176,7 +175,7 @@ private: class InMemoryRecordStore::ReverseCursor final : public RecordCursor { public: ReverseCursor(OperationContext* txn, const InMemoryRecordStore& rs) - : _txn(txn), _records(rs._data->records), _isCapped(rs.isCapped()) {} + : _records(rs._data->records), _isCapped(rs.isCapped()) {} boost::optional<Record> next() final { if (_needFirstSeek) { @@ -212,18 +211,15 @@ public: } void savePositioned() final { - _txn = nullptr; if (!_needFirstSeek && !_lastMoveWasRestore) _savedId = _it == _records.rend() ? RecordId() : _it->first; } void saveUnpositioned() final { - _txn = nullptr; _savedId = RecordId(); } - bool restore(OperationContext* txn) final { - _txn = txn; + bool restore() final { if (_savedId.isNull()) { _it = _records.rend(); return true; @@ -239,8 +235,10 @@ public: return !(_isCapped && _lastMoveWasRestore); } + void detachFromOperationContext() final {} + void reattachToOperationContext(OperationContext* txn) final {} + private: - unowned_ptr<OperationContext> _txn; Records::const_reverse_iterator _it; bool _needFirstSeek = true; bool _lastMoveWasRestore = false; diff --git a/src/mongo/db/storage/mmap_v1/btree/btree_interface.cpp b/src/mongo/db/storage/mmap_v1/btree/btree_interface.cpp index ce1aa117fef..1154b296dcf 100644 --- a/src/mongo/db/storage/mmap_v1/btree/btree_interface.cpp +++ b/src/mongo/db/storage/mmap_v1/btree/btree_interface.cpp @@ -197,8 +197,6 @@ public: } void savePositioned() override { - _txn = nullptr; - if (!_lastMoveWasRestore) _savedEOF = isEOF(); @@ -215,7 +213,6 @@ public: } void saveUnpositioned() override { - _txn = nullptr; // Don't leak our registration if savePositioned() was previously called. if (!_saved.bucket.isNull()) _btree->savedCursors()->unregisterCursor(&_saved); @@ -224,11 +221,7 @@ public: _savedEOF = true; } - void restore(OperationContext* txn) override { - // guard against accidental double restore - invariant(!_txn); - _txn = txn; - + void restore() override { // Always do a full seek on restore. We cannot use our last position since index // entries may have been inserted closer to our endpoint and we would need to move // over them. @@ -251,6 +244,14 @@ public: || getDiskLoc() != _saved.loc || compareKeys(getKey(), _saved.key) != 0; } + void detachFromOperationContext() final { + _txn = nullptr; + } + + void reattachToOperationContext(OperationContext* txn) final { + _txn = txn; + } + private: bool isEOF() const { return _bucket.isNull(); diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h index 5c0437cce56..7ba485d2a8b 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h @@ -336,9 +336,15 @@ public: boost::optional<Record> seekExact(const RecordId& id) final; void invalidate(const RecordId& dl) final; void savePositioned() final {} - bool restore(OperationContext* txn) final { + bool restore() final { return true; } + void detachFromOperationContext() final { + _txn = nullptr; + } + void reattachToOperationContext(OperationContext* txn) final { + _txn = txn; + } std::unique_ptr<RecordFetcher> fetcherForNext() const final; private: diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp index 353a7f39c0c..e1487ee20fe 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp @@ -99,12 +99,9 @@ void CappedRecordStoreV1Iterator::invalidate(const RecordId& id) { } } -void CappedRecordStoreV1Iterator::savePositioned() { - _txn = nullptr; -} +void CappedRecordStoreV1Iterator::savePositioned() {} -bool CappedRecordStoreV1Iterator::restore(OperationContext* txn) { - _txn = txn; +bool CappedRecordStoreV1Iterator::restore() { return !_killedByInvalidate; } diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h index 0a366d9921a..3793c5cce4b 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h @@ -50,7 +50,13 @@ public: boost::optional<Record> next() final; boost::optional<Record> seekExact(const RecordId& id) final; void savePositioned() final; - bool restore(OperationContext* txn) final; + bool restore() final; + void detachFromOperationContext() final { + _txn = nullptr; + } + void reattachToOperationContext(OperationContext* txn) final { + _txn = txn; + } void invalidate(const RecordId& dl) final; std::unique_ptr<RecordFetcher> fetcherForNext() const final; std::unique_ptr<RecordFetcher> fetcherForId(const RecordId& id) const final; diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h b/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h index def5178ad8e..eb2dc6cb8e1 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h @@ -47,12 +47,15 @@ public: boost::optional<Record> next() final; boost::optional<Record> seekExact(const RecordId& id) final; void invalidate(const RecordId& dl); - void savePositioned() final { + void savePositioned() final {} + bool restore() final { + return true; + } + void detachFromOperationContext() final { _txn = nullptr; } - bool restore(OperationContext* txn) final { + void reattachToOperationContext(OperationContext* txn) final { _txn = txn; - return true; } // Explicitly not supporting fetcherForNext(). The expected use case for this class is a diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp index babfbcf26ea..5d668004b68 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp @@ -108,12 +108,9 @@ void SimpleRecordStoreV1Iterator::invalidate(const RecordId& dl) { } } -void SimpleRecordStoreV1Iterator::savePositioned() { - _txn = nullptr; -} +void SimpleRecordStoreV1Iterator::savePositioned() {} -bool SimpleRecordStoreV1Iterator::restore(OperationContext* txn) { - _txn = txn; +bool SimpleRecordStoreV1Iterator::restore() { // if the collection is dropped, then the cursor should be destroyed return true; } diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h index 91b0088bf72..4b74864bf1c 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h @@ -50,7 +50,13 @@ public: boost::optional<Record> next() final; boost::optional<Record> seekExact(const RecordId& id) final; void savePositioned() final; - bool restore(OperationContext* txn) final; + bool restore() final; + void detachFromOperationContext() final { + _txn = nullptr; + } + void reattachToOperationContext(OperationContext* txn) final { + _txn = txn; + } void invalidate(const RecordId& dl) final; std::unique_ptr<RecordFetcher> fetcherForNext() const final; std::unique_ptr<RecordFetcher> fetcherForId(const RecordId& id) const final; diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index c3226fa41e1..6d4fd9dae9f 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -196,7 +196,24 @@ public: * * This handles restoring after either savePositioned() or saveUnpositioned(). */ - virtual bool restore(OperationContext* txn) = 0; + virtual bool restore() = 0; + + /** + * Detaches from the OperationContext and releases any storage-engine state. + * + * It is only legal to call this when in a "saved" state. While in the "detached" state, it is + * only legal to call reattachToOperationContext or the destructor. It is not legal to call + * detachFromOperationContext() while already in the detached state. + */ + virtual void detachFromOperationContext() = 0; + + /** + * Reattaches to the OperationContext and reacquires any storage-engine state. + * + * It is only legal to call this in the "detached" state. On return, the cursor is left in a + * "saved" state, so callers must still call restoreState to use this object. + */ + virtual void reattachToOperationContext(OperationContext* opCtx) = 0; /** * Inform the cursor that this id is being invalidated. diff --git a/src/mongo/db/storage/record_store_test_recorditer.cpp b/src/mongo/db/storage/record_store_test_recorditer.cpp index 3be72344c85..7d42c6bfe2a 100644 --- a/src/mongo/db/storage/record_store_test_recorditer.cpp +++ b/src/mongo/db/storage/record_store_test_recorditer.cpp @@ -316,7 +316,7 @@ TEST(RecordStoreTestHarness, RecordIteratorEOF) { ASSERT_OK(res.getStatus()); uow.commit(); - ASSERT(cursor->restore(opCtx.get())); + ASSERT(cursor->restore()); // Iterator should still be EOF. ASSERT(!cursor->next()); @@ -369,7 +369,7 @@ TEST(RecordStoreTestHarness, RecordIteratorSavePositionedRestore) { for (int i = 0; i < nToInsert; i++) { cursor->savePositioned(); cursor->savePositioned(); // It is legal to save twice in a row. - cursor->restore(opCtx.get()); + cursor->restore(); const auto record = cursor->next(); ASSERT(record); @@ -379,7 +379,7 @@ TEST(RecordStoreTestHarness, RecordIteratorSavePositionedRestore) { cursor->savePositioned(); cursor->savePositioned(); // It is legal to save twice in a row. - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT(!cursor->next()); } diff --git a/src/mongo/db/storage/record_store_test_repairiter.cpp b/src/mongo/db/storage/record_store_test_repairiter.cpp index 56abdcb6b14..4e21eccc2e1 100644 --- a/src/mongo/db/storage/record_store_test_repairiter.cpp +++ b/src/mongo/db/storage/record_store_test_repairiter.cpp @@ -158,7 +158,7 @@ TEST(RecordStoreTestHarness, GetIteratorForRepairInvalidateSingleton) { // Invalidate the record we're pointing at. cursor->savePositioned(); cursor->invalidate(idToInvalidate); - cursor->restore(opCtx.get()); + cursor->restore(); // Iterator should be EOF now because the only thing in the collection got deleted. ASSERT(!cursor->next()); diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 3bb9d3093ea..d50da13cb95 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -53,9 +53,6 @@ public: virtual void reportState(BSONObjBuilder* b) const {} - virtual void beingReleasedFromOperationContext() {} - virtual void beingSetOnOperationContext() {} - /** * These should be called through WriteUnitOfWork rather than directly. * @@ -109,6 +106,13 @@ public: "Current storage engine does not support $readMajorityTemporaryName"}; } + /** + * Returns true if setReadFromMajorityCommittedSnapshot() has been called. + */ + virtual bool isReadingFromMajorityCommittedSnapshot() { + return false; + } + virtual SnapshotId getSnapshotId() const = 0; /** diff --git a/src/mongo/db/storage/sorted_data_interface.h b/src/mongo/db/storage/sorted_data_interface.h index 2836c7c4814..e373fa1ff44 100644 --- a/src/mongo/db/storage/sorted_data_interface.h +++ b/src/mongo/db/storage/sorted_data_interface.h @@ -327,7 +327,24 @@ public: * * This handles restoring after either savePositioned() or saveUnpositioned(). */ - virtual void restore(OperationContext* txn) = 0; + virtual void restore() = 0; + + /** + * Detaches from the OperationContext and releases any storage-engine state. + * + * It is only legal to call this when in a "saved" state. While in the "detached" state, it + * is only legal to call reattachToOperationContext or the destructor. It is not legal to + * call detachFromOperationContext() while already in the detached state. + */ + virtual void detachFromOperationContext() = 0; + + /** + * Reattaches to the OperationContext and reacquires any storage-engine state. + * + * It is only legal to call this in the "detached" state. On return, the cursor is left in a + * "saved" state, so callers must still call restoreState to use this object. + */ + virtual void reattachToOperationContext(OperationContext* opCtx) = 0; }; /** diff --git a/src/mongo/db/storage/sorted_data_interface_test_cursor_end_position.cpp b/src/mongo/db/storage/sorted_data_interface_test_cursor_end_position.cpp index 190f707c4c0..e0d31fa90d1 100644 --- a/src/mongo/db/storage/sorted_data_interface_test_cursor_end_position.cpp +++ b/src/mongo/db/storage/sorted_data_interface_test_cursor_end_position.cpp @@ -144,7 +144,7 @@ void testSetEndPosition_Seek_Forward(bool unique, bool inclusive) { cursor->saveUnpositioned(); removeFromIndex(opCtx, sorted, {{key3, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->seek(key2, true), boost::none); ASSERT_EQ(cursor->seek(key3, true), boost::none); @@ -192,7 +192,7 @@ void testSetEndPosition_Seek_Reverse(bool unique, bool inclusive) { cursor->saveUnpositioned(); removeFromIndex(opCtx, sorted, {{key2, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->seek(key3, true), boost::none); ASSERT_EQ(cursor->seek(key2, true), boost::none); @@ -226,7 +226,7 @@ void testSetEndPosition_Restore_Forward(bool unique) { ASSERT_EQ(cursor->seek(key1, true), IndexKeyEntry(key1, loc1)); cursor->savePositioned(); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1)); @@ -236,7 +236,7 @@ void testSetEndPosition_Restore_Forward(bool unique) { { {key2, loc1}, {key3, loc1}, }); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->next(), boost::none); } @@ -262,7 +262,7 @@ void testSetEndPosition_Restore_Reverse(bool unique) { ASSERT_EQ(cursor->seek(key4, true), IndexKeyEntry(key4, loc1)); cursor->savePositioned(); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->next(), IndexKeyEntry(key3, loc1)); @@ -272,7 +272,7 @@ void testSetEndPosition_Restore_Reverse(bool unique) { { {key2, loc1}, {key3, loc1}, }); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->next(), boost::none); } @@ -309,7 +309,7 @@ void testSetEndPosition_RestoreEndCursor_Forward(bool unique) { {key2, loc1}, // in range {key3, loc1}, // out of range }); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->seek(key1, true), IndexKeyEntry(key1, loc1)); ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1)); @@ -342,7 +342,7 @@ void testSetEndPosition_RestoreEndCursor_Reverse(bool unique) { {key2, loc1}, // in range {key3, loc1}, // out of range }); - cursor->restore(opCtx.get()); // must restore end cursor even with saveUnpositioned(). + cursor->restore(); // must restore end cursor even with saveUnpositioned(). ASSERT_EQ(cursor->seek(key4, true), IndexKeyEntry(key4, loc1)); ASSERT_EQ(cursor->next(), IndexKeyEntry(key3, loc1)); diff --git a/src/mongo/db/storage/sorted_data_interface_test_cursor_saverestore.cpp b/src/mongo/db/storage/sorted_data_interface_test_cursor_saverestore.cpp index 679bb3f8c8b..c63e6ea15fe 100644 --- a/src/mongo/db/storage/sorted_data_interface_test_cursor_saverestore.cpp +++ b/src/mongo/db/storage/sorted_data_interface_test_cursor_saverestore.cpp @@ -75,7 +75,7 @@ TEST(SortedDataInterface, SaveAndRestorePositionWhileIterateCursor) { ASSERT_EQ(entry, IndexKeyEntry(BSON("" << i), RecordId(42, i * 2))); cursor->savePositioned(); - cursor->restore(opCtx.get()); + cursor->restore(); } ASSERT(!cursor->next()); ASSERT_EQ(i, nToInsert); @@ -121,7 +121,7 @@ TEST(SortedDataInterface, SaveAndRestorePositionWhileIterateCursorReversed) { ASSERT_EQ(entry, IndexKeyEntry(BSON("" << i), RecordId(42, i * 2))); cursor->savePositioned(); - cursor->restore(opCtx.get()); + cursor->restore(); } ASSERT(!cursor->next()); ASSERT_EQ(i, -1); @@ -166,7 +166,7 @@ TEST(SortedDataInterface, SaveAndRestorePositionWhileIterateCursorWithDupKeys) { ASSERT_EQ(entry, IndexKeyEntry(key1, RecordId(42, i * 2))); cursor->savePositioned(); - cursor->restore(opCtx.get()); + cursor->restore(); } ASSERT(!cursor->next()); ASSERT_EQ(i, nToInsert); @@ -212,7 +212,7 @@ TEST(SortedDataInterface, SaveAndRestorePositionWhileIterateCursorWithDupKeysRev ASSERT_EQ(entry, IndexKeyEntry(key1, RecordId(42, i * 2))); cursor->savePositioned(); - cursor->restore(opCtx.get()); + cursor->restore(); } ASSERT(!cursor->next()); ASSERT_EQ(i, -1); @@ -301,7 +301,7 @@ void testSaveAndRestorePositionSeesNewInserts(bool forward, bool unique) { cursor->savePositioned(); insertToIndex(opCtx, sorted, {{key2, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1)); } @@ -335,12 +335,12 @@ void testSaveAndRestorePositionSeesNewInsertsAfterRemove(bool forward, bool uniq cursor->savePositioned(); removeFromIndex(opCtx, sorted, {{key1, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); // The restore may have seeked since it can't return to the saved position. cursor->savePositioned(); // Should still save originally saved key as "current position". insertToIndex(opCtx, sorted, {{key2, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1)); } @@ -375,13 +375,13 @@ void testSaveAndRestorePositionSeesNewInsertsAfterEOF(bool forward, bool unique) cursor->savePositioned(); removeFromIndex(opCtx, sorted, {{key1, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); // The restore may have seeked to EOF. auto insertPoint = forward ? key2 : key0; cursor->savePositioned(); // Should still save key1 as "current position". insertToIndex(opCtx, sorted, {{insertPoint, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->next(), IndexKeyEntry(insertPoint, loc1)); } @@ -415,24 +415,24 @@ void testSaveAndRestorePositionConsidersRecordId_Forward(bool unique) { cursor->savePositioned(); removeFromIndex(opCtx, sorted, {{key1, loc1}}); insertToIndex(opCtx, sorted, {{key1, loc2}}); - cursor->restore(opCtx.get()); // Lands on inserted key. + cursor->restore(); // Lands on inserted key. ASSERT_EQ(cursor->next(), IndexKeyEntry(key1, loc2)); cursor->savePositioned(); removeFromIndex(opCtx, sorted, {{key1, loc2}}); insertToIndex(opCtx, sorted, {{key1, loc1}}); - cursor->restore(opCtx.get()); // Lands after inserted. + cursor->restore(); // Lands after inserted. ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1)); cursor->savePositioned(); removeFromIndex(opCtx, sorted, {{key2, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); cursor->savePositioned(); insertToIndex(opCtx, sorted, {{key2, loc1}}); - cursor->restore(opCtx.get()); // Lands at same point as initial save. + cursor->restore(); // Lands at same point as initial save. // Advances from restore point since restore didn't move position. ASSERT_EQ(cursor->next(), IndexKeyEntry(key3, loc1)); @@ -460,24 +460,24 @@ void testSaveAndRestorePositionConsidersRecordId_Reverse(bool unique) { cursor->savePositioned(); removeFromIndex(opCtx, sorted, {{key2, loc2}}); insertToIndex(opCtx, sorted, {{key2, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->next(), IndexKeyEntry(key2, loc1)); cursor->savePositioned(); removeFromIndex(opCtx, sorted, {{key2, loc1}}); insertToIndex(opCtx, sorted, {{key2, loc2}}); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->next(), IndexKeyEntry(key1, loc1)); cursor->savePositioned(); removeFromIndex(opCtx, sorted, {{key1, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); cursor->savePositioned(); insertToIndex(opCtx, sorted, {{key1, loc1}}); - cursor->restore(opCtx.get()); // Lands at same point as initial save. + cursor->restore(); // Lands at same point as initial save. // Advances from restore point since restore didn't move position. ASSERT_EQ(cursor->next(), IndexKeyEntry(key0, loc1)); @@ -504,12 +504,12 @@ TEST(SortedDataInterface, SaveUnpositionedAndRestore) { cursor->saveUnpositioned(); removeFromIndex(opCtx, sorted, {{key2, loc1}}); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->seek(key1, true), IndexKeyEntry(key1, loc1)); cursor->saveUnpositioned(); - cursor->restore(opCtx.get()); + cursor->restore(); ASSERT_EQ(cursor->seek(key3, true), IndexKeyEntry(key3, loc1)); } diff --git a/src/mongo/db/storage/sorted_data_interface_test_harness.cpp b/src/mongo/db/storage/sorted_data_interface_test_harness.cpp index 13929c7eacc..7947aae82e5 100644 --- a/src/mongo/db/storage/sorted_data_interface_test_harness.cpp +++ b/src/mongo/db/storage/sorted_data_interface_test_harness.cpp @@ -359,7 +359,7 @@ TEST(SortedDataInterface, CursorIterate1WithSaveRestore) { ASSERT_EQ(entry, IndexKeyEntry(BSON("" << n), RecordId(5, n * 2))); n++; cursor->savePositioned(); - cursor->restore(opCtx.get()); + cursor->restore(); } ASSERT_EQUALS(N, n); } @@ -388,7 +388,7 @@ TEST(SortedDataInterface, CursorIterateAllDupKeysWithSaveRestore) { ASSERT_EQ(entry, IndexKeyEntry(BSON("" << 5), RecordId(5, n * 2))); n++; cursor->savePositioned(); - cursor->restore(opCtx.get()); + cursor->restore(); } ASSERT_EQUALS(N, n); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp index bd727575954..d824406dfae 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_index.cpp @@ -595,8 +595,9 @@ namespace { class WiredTigerIndexCursorBase : public SortedDataInterface::Cursor { public: WiredTigerIndexCursorBase(const WiredTigerIndex& idx, OperationContext* txn, bool forward) - : _txn(txn), _cursor(idx.uri(), idx.tableId(), false, txn), _idx(idx), _forward(forward) {} - + : _txn(txn), _idx(idx), _forward(forward) { + _cursor.emplace(_idx.uri(), _idx.tableId(), false, _txn); + } boost::optional<IndexKeyEntry> next(RequestedInfo parts) override { // Advance on a cursor at the end is a no-op if (_eof) @@ -654,24 +655,16 @@ public: } void savePositioned() override { - if (!_txn) - return; // still saved - - _savedForCheck = _txn->recoveryUnit(); - - if (!wt_keeptxnopen()) { - try { - _cursor.reset(); - } catch (const WriteConflictException& wce) { - // Ignore since this is only called when we are about to kill our transaction - // anyway. - } - - // Our saved position is wherever we were when we last called updatePosition(). - // Any partially completed repositions should not effect our saved position. + try { + if (_cursor) + _cursor->reset(); + } catch (const WriteConflictException& wce) { + // Ignore since this is only called when we are about to kill our transaction + // anyway. } - _txn = NULL; + // Our saved position is wherever we were when we last called updatePosition(). + // Any partially completed repositions should not effect our saved position. } void saveUnpositioned() override { @@ -679,21 +672,30 @@ public: _eof = true; } - void restore(OperationContext* txn) override { - // Update the session handle with our new operation context. - invariant(_savedForCheck == txn->recoveryUnit()); - _txn = txn; + void restore() override { + if (!_cursor) { + _cursor.emplace(_idx.uri(), _idx.tableId(), false, _txn); + } - if (!wt_keeptxnopen()) { - if (!_eof) { - // Ensure an active session exists, so any restored cursors will bind to it - WiredTigerRecoveryUnit::get(txn)->getSession(txn); - _lastMoveWasRestore = !seekWTCursor(_key); - TRACE_CURSOR << "restore _lastMoveWasRestore:" << _lastMoveWasRestore; - } + // Ensure an active session exists, so any restored cursors will bind to it + invariant(WiredTigerRecoveryUnit::get(_txn)->getSession(_txn) == _cursor->getSession()); + + if (!_eof) { + _lastMoveWasRestore = !seekWTCursor(_key); + TRACE_CURSOR << "restore _lastMoveWasRestore:" << _lastMoveWasRestore; } } + void detachFromOperationContext() final { + _txn = nullptr; + _cursor = {}; + } + + void reattachToOperationContext(OperationContext* txn) final { + _txn = txn; + // _cursor recreated in restore() to avoid risk of WT_ROLLBACK issues. + } + protected: // Called after _key has been filled in. Must not throw WriteConflictException. virtual void updateLocAndTypeBits() = 0; @@ -738,7 +740,7 @@ protected: } void advanceWTCursor() { - WT_CURSOR* c = _cursor.get(); + WT_CURSOR* c = _cursor->get(); int ret = WT_OP_CHECK(_forward ? c->next(c) : c->prev(c)); if (ret == WT_NOTFOUND) { _cursorAtEof = true; @@ -750,7 +752,7 @@ protected: // Seeks to query. Returns true on exact match. bool seekWTCursor(const KeyString& query) { - WT_CURSOR* c = _cursor.get(); + WT_CURSOR* c = _cursor->get(); int cmp = -1; const WiredTigerItem keyItem(query.getBuffer(), query.getSize()); @@ -795,7 +797,7 @@ protected: _eof = false; - WT_CURSOR* c = _cursor.get(); + WT_CURSOR* c = _cursor->get(); WT_ITEM item; invariantWTOK(c->get_key(c, &item)); _key.resetFromBuffer(item.data, item.size); @@ -809,13 +811,10 @@ protected: } OperationContext* _txn; - WiredTigerCursor _cursor; + boost::optional<WiredTigerCursor> _cursor; const WiredTigerIndex& _idx; // not owned const bool _forward; - // Ensures we have the same RU at restore time. - RecoveryUnit* _savedForCheck; - // These are where this cursor instance is. They are not changed in the face of a failing // next(). KeyString _key; @@ -844,7 +843,7 @@ public: void updateLocAndTypeBits() override { _loc = KeyString::decodeRecordIdAtEnd(_key.getBuffer(), _key.getSize()); - WT_CURSOR* c = _cursor.get(); + WT_CURSOR* c = _cursor->get(); WT_ITEM item; invariantWTOK(c->get_value(c, &item)); BufReader br(item.data, item.size); @@ -857,8 +856,8 @@ public: WiredTigerIndexUniqueCursor(const WiredTigerIndex& idx, OperationContext* txn, bool forward) : WiredTigerIndexCursorBase(idx, txn, forward) {} - void restore(OperationContext* txn) override { - WiredTigerIndexCursorBase::restore(txn); + void restore() override { + WiredTigerIndexCursorBase::restore(); // In addition to seeking to the correct key, we also need to make sure that the loc is // on the correct side of _loc. @@ -869,7 +868,7 @@ public: // If we get here we need to look at the actual RecordId for this key and make sure we // are supposed to see it. - WT_CURSOR* c = _cursor.get(); + WT_CURSOR* c = _cursor->get(); WT_ITEM item; invariantWTOK(c->get_value(c, &item)); @@ -893,7 +892,7 @@ public: // We assume that cursors can only ever see unique indexes in their "pristine" state, // where no duplicates are possible. The cases where dups are allowed should hold // sufficient locks to ensure that no cursor ever sees them. - WT_CURSOR* c = _cursor.get(); + WT_CURSOR* c = _cursor->get(); WT_ITEM item; invariantWTOK(c->get_value(c, &item)); @@ -912,7 +911,7 @@ public: _query.resetToKey(stripFieldNames(key), _idx.ordering()); const WiredTigerItem keyItem(_query.getBuffer(), _query.getSize()); - WT_CURSOR* c = _cursor.get(); + WT_CURSOR* c = _cursor->get(); c->set_key(c, keyItem.Get()); // Using search rather than search_near. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index c423cd3e1f9..9008eb4ce2a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -93,16 +93,13 @@ const std::string kWiredTigerEngineName = "wiredTiger"; class WiredTigerRecordStore::Cursor final : public RecordCursor { public: - Cursor(OperationContext* txn, - const WiredTigerRecordStore& rs, - bool forward = true, - bool forParallelCollectionScan = false) + Cursor(OperationContext* txn, const WiredTigerRecordStore& rs, bool forward = true) : _rs(rs), _txn(txn), _forward(forward), - _forParallelCollectionScan(forParallelCollectionScan), - _cursor(new WiredTigerCursor(rs.getURI(), rs.tableId(), true, txn)), - _readUntilForOplog(WiredTigerRecoveryUnit::get(txn)->getOplogReadTill()) {} + _readUntilForOplog(WiredTigerRecoveryUnit::get(txn)->getOplogReadTill()) { + _cursor.emplace(rs.getURI(), rs.tableId(), true, txn); + } boost::optional<Record> next() final { if (_eof) @@ -186,27 +183,13 @@ public: } void savePositioned() final { - // It must be safe to call save() twice in a row without calling restore(). - if (!_txn) - return; - - // the cursor and recoveryUnit are valid on restore - // so we just record the recoveryUnit to make sure - _savedRecoveryUnit = _txn->recoveryUnit(); - if (_cursor && !wt_keeptxnopen()) { - try { + try { + if (_cursor) _cursor->reset(); - } catch (const WriteConflictException& wce) { - // Ignore since this is only called when we are about to kill our transaction - // anyway. - } + } catch (const WriteConflictException& wce) { + // Ignore since this is only called when we are about to kill our transaction + // anyway. } - - if (_forParallelCollectionScan) { - // Delete the cursor since we may come back to a different RecoveryUnit - _cursor.reset(); - } - _txn = nullptr; } void saveUnpositioned() final { @@ -214,31 +197,20 @@ public: _lastReturnedId = RecordId(); } - bool restore(OperationContext* txn) final { - _txn = txn; + bool restore() final { + if (!_cursor) + _cursor.emplace(_rs.getURI(), _rs.tableId(), true, _txn); + + // This will ensure an active session exists, so any restored cursors will bind to it + invariant(WiredTigerRecoveryUnit::get(_txn)->getSession(_txn) == _cursor->getSession()); // If we've hit EOF, then this iterator is done and need not be restored. if (_eof) return true; - bool needRestore = false; - - if (_forParallelCollectionScan) { - needRestore = true; - _savedRecoveryUnit = txn->recoveryUnit(); - _cursor.reset(new WiredTigerCursor(_rs.getURI(), _rs.tableId(), true, txn)); - _forParallelCollectionScan = false; // we only do this the first time - } - invariant(_savedRecoveryUnit == txn->recoveryUnit()); - - if (!needRestore && wt_keeptxnopen()) - return true; if (_lastReturnedId.isNull()) return true; - // This will ensure an active session exists, so any restored cursors will bind to it - invariant(WiredTigerRecoveryUnit::get(txn)->getSession(txn) == _cursor->getSession()); - WT_CURSOR* c = _cursor->get(); c->set_key(c, _makeKey(_lastReturnedId)); @@ -276,6 +248,16 @@ public: return true; } + void detachFromOperationContext() final { + _txn = nullptr; + _cursor = {}; + } + + void reattachToOperationContext(OperationContext* txn) final { + _txn = txn; + // _cursor recreated in restore() to avoid risk of WT_ROLLBACK issues. + } + private: bool isVisible(const RecordId& id) { if (!_rs._isCapped) @@ -297,10 +279,8 @@ private: const WiredTigerRecordStore& _rs; OperationContext* _txn; - RecoveryUnit* _savedRecoveryUnit; // only used to sanity check between save/restore. const bool _forward; - bool _forParallelCollectionScan; // This can go away once SERVER-17364 is resolved. - std::unique_ptr<WiredTigerCursor> _cursor; + boost::optional<WiredTigerCursor> _cursor; bool _eof = false; RecordId _lastReturnedId; // If null, need to seek to first/last record. const RecordId _readUntilForOplog; @@ -906,10 +886,7 @@ std::unique_ptr<RecordCursor> WiredTigerRecordStore::getCursor(OperationContext* std::vector<std::unique_ptr<RecordCursor>> WiredTigerRecordStore::getManyCursors( OperationContext* txn) const { std::vector<std::unique_ptr<RecordCursor>> cursors(1); - cursors[0] = stdx::make_unique<Cursor>(txn, - *this, - /*forward=*/true, - /*forParallelCollectionScan=*/true); + cursors[0] = stdx::make_unique<Cursor>(txn, *this, /*forward=*/true); return cursors; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp index 2dd31906af8..38447246664 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -735,7 +735,7 @@ TEST(WiredTigerRecordStoreTest, CappedCursorRollover) { } // cursor should now be dead - ASSERT_FALSE(cursor->restore(cursorCtx.get())); + ASSERT_FALSE(cursor->restore()); ASSERT(!cursor->next()); } @@ -871,7 +871,7 @@ TEST(WiredTigerRecordStoreTest, CappedCursorYieldFirst) { // See that things work if you yield before you first call getNext(). cursor->savePositioned(); cursorCtx->recoveryUnit()->abandonSnapshot(); - ASSERT_TRUE(cursor->restore(cursorCtx.get())); + ASSERT_TRUE(cursor->restore()); auto record = cursor->next(); ASSERT_EQ(loc1, record->id); ASSERT(!cursor->next()); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 8ee03af4311..cdc9981df78 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -378,19 +378,6 @@ void WiredTigerRecoveryUnit::_txnOpen(OperationContext* opCtx) { _active = true; } -void WiredTigerRecoveryUnit::beingReleasedFromOperationContext() { - LOG(2) << "WiredTigerRecoveryUnit::beingReleased"; - _currentlySquirreled = true; - if (_active == false && !wt_keeptxnopen()) { - _commit(); - } -} -void WiredTigerRecoveryUnit::beingSetOnOperationContext() { - LOG(2) << "WiredTigerRecoveryUnit::broughtBack"; - _currentlySquirreled = false; -} - - // --------------------- WiredTigerCursor::WiredTigerCursor(const std::string& uri, diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index d89ef1f3964..edd2085db29 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -65,9 +65,6 @@ public: virtual void registerChange(Change*); - virtual void beingReleasedFromOperationContext(); - virtual void beingSetOnOperationContext(); - virtual void abandonSnapshot(); // un-used API @@ -80,6 +77,9 @@ public: virtual SnapshotId getSnapshotId() const; Status setReadFromMajorityCommittedSnapshot() final; + bool isReadingFromMajorityCommittedSnapshot() final { + return _readFromMajorityCommittedSnapshot; + } // ---- WT STUFF diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp index 383f276d5a3..c05e886468d 100644 --- a/src/mongo/db/ttl.cpp +++ b/src/mongo/db/ttl.cpp @@ -304,7 +304,7 @@ private: } ++numDeleted; ttlDeletedDocuments.increment(); - if (!exec->restoreState(txn)) { + if (!exec->restoreState()) { return true; } } diff --git a/src/mongo/dbtests/executor_registry.cpp b/src/mongo/dbtests/executor_registry.cpp index dc02f95c58f..cfa67a3bf29 100644 --- a/src/mongo/dbtests/executor_registry.cpp +++ b/src/mongo/dbtests/executor_registry.cpp @@ -157,7 +157,7 @@ public: deregisterExecutor(run.get()); // And clean up anything that happened before. - run->restoreState(&_opCtx); + run->restoreState(); // Make sure that the runner moved forward over the deleted data. We don't see foo==10 // or foo==11. @@ -192,7 +192,7 @@ public: // Unregister and restore state. deregisterExecutor(run.get()); - run->restoreState(&_opCtx); + run->restoreState(); ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL)); ASSERT_EQUALS(10, obj["foo"].numberInt()); @@ -206,7 +206,7 @@ public: // Unregister and restore state. deregisterExecutor(run.get()); - run->restoreState(&_opCtx); + run->restoreState(); // PlanExecutor was killed. ASSERT_EQUALS(PlanExecutor::DEAD, run->getNext(&obj, NULL)); @@ -237,7 +237,7 @@ public: // Unregister and restore state. deregisterExecutor(run.get()); - run->restoreState(&_opCtx); + run->restoreState(); // PlanExecutor was killed. ASSERT_EQUALS(PlanExecutor::DEAD, run->getNext(&obj, NULL)); @@ -268,7 +268,7 @@ public: // Unregister and restore state. deregisterExecutor(run.get()); - run->restoreState(&_opCtx); + run->restoreState(); // PlanExecutor was killed. ASSERT_EQUALS(PlanExecutor::DEAD, run->getNext(&obj, NULL)); @@ -300,7 +300,7 @@ public: // Unregister and restore state. deregisterExecutor(run.get()); - run->restoreState(&_opCtx); + run->restoreState(); ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL)); ASSERT_EQUALS(10, obj["foo"].numberInt()); @@ -316,7 +316,7 @@ public: // Unregister and restore state. deregisterExecutor(run.get()); - run->restoreState(&_opCtx); + run->restoreState(); _ctx.reset(); // PlanExecutor was killed. diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index e24638ee5c7..e557cf844e7 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -419,7 +419,7 @@ public: PlanExecutor* exec = makeCollScanExec(coll, filterObj); // Make a client cursor from the runner. - new ClientCursor(coll->getCursorManager(), exec, ns(), 0, BSONObj()); + new ClientCursor(coll->getCursorManager(), exec, ns(), false, 0, BSONObj()); // There should be one cursor before invalidation, // and zero cursors after invalidation. @@ -446,7 +446,7 @@ public: // Make a client cursor from the runner. ClientCursor* cc = - new ClientCursor(collection->getCursorManager(), exec, ns(), 0, BSONObj()); + new ClientCursor(collection->getCursorManager(), exec, ns(), false, 0, BSONObj()); ClientCursorPin ccPin(collection->getCursorManager(), cc->cursorid()); // If the cursor is pinned, it sticks around, @@ -490,7 +490,7 @@ public: PlanExecutor* exec = makeCollScanExec(collection, filterObj); // Make a client cursor from the runner. - new ClientCursor(collection->getCursorManager(), exec, ns(), 0, BSONObj()); + new ClientCursor(collection->getCursorManager(), exec, ns(), false, 0, BSONObj()); } // There should be one cursor before timeout, diff --git a/src/mongo/dbtests/query_stage_and.cpp b/src/mongo/dbtests/query_stage_and.cpp index f1af8d32704..22fcdc5ad70 100644 --- a/src/mongo/dbtests/query_stage_and.cpp +++ b/src/mongo/dbtests/query_stage_and.cpp @@ -223,7 +223,7 @@ public: } } size_t memUsageAfter = ah->getMemUsage(); - ah->restoreState(&_txn); + ah->restoreState(); // Invalidating a read object should decrease memory usage. ASSERT_LESS_THAN(memUsageAfter, memUsageBefore); @@ -331,7 +331,7 @@ public: // Look ahead results do not count towards memory usage. ASSERT_EQUALS(memUsageBefore, memUsageAfter); - ah->restoreState(&_txn); + ah->restoreState(); // The deleted obj should show up in flagged. ASSERT_EQUALS(size_t(1), flagged.size()); @@ -1056,7 +1056,7 @@ public: ah->saveState(); ah->invalidate(&_txn, *data.begin(), INVALIDATION_DELETION); remove(coll->docFor(&_txn, *data.begin()).value()); - ah->restoreState(&_txn); + ah->restoreState(); // Make sure the nuked obj is actually in the flagged data. ASSERT_EQUALS(ws.getFlagged().size(), size_t(1)); @@ -1099,7 +1099,7 @@ public: ah->saveState(); ah->invalidate(&_txn, *it, INVALIDATION_DELETION); remove(coll->docFor(&_txn, *it).value()); - ah->restoreState(&_txn); + ah->restoreState(); // Get all results aside from the two we killed. while (!ah->isEOF()) { diff --git a/src/mongo/dbtests/query_stage_collscan.cpp b/src/mongo/dbtests/query_stage_collscan.cpp index 33341c12b5e..eea912d4bd4 100644 --- a/src/mongo/dbtests/query_stage_collscan.cpp +++ b/src/mongo/dbtests/query_stage_collscan.cpp @@ -301,7 +301,7 @@ public: scan->saveState(); scan->invalidate(&_txn, locs[count], INVALIDATION_DELETION); remove(coll->docFor(&_txn, locs[count]).value()); - scan->restoreState(&_txn); + scan->restoreState(); // Skip over locs[count]. ++count; @@ -362,7 +362,7 @@ public: scan->saveState(); scan->invalidate(&_txn, locs[count], INVALIDATION_DELETION); remove(coll->docFor(&_txn, locs[count]).value()); - scan->restoreState(&_txn); + scan->restoreState(); // Skip over locs[count]. ++count; diff --git a/src/mongo/dbtests/query_stage_count.cpp b/src/mongo/dbtests/query_stage_count.cpp index 82952021dca..b1273f6f7f8 100644 --- a/src/mongo/dbtests/query_stage_count.cpp +++ b/src/mongo/dbtests/query_stage_count.cpp @@ -182,7 +182,7 @@ public: } // resume from yield - count_stage.restoreState(&_txn); + count_stage.restoreState(); } return static_cast<const CountStats*>(count_stage.getSpecificStats()); diff --git a/src/mongo/dbtests/query_stage_count_scan.cpp b/src/mongo/dbtests/query_stage_count_scan.cpp index 012a2442bad..e499e06cbdc 100644 --- a/src/mongo/dbtests/query_stage_count_scan.cpp +++ b/src/mongo/dbtests/query_stage_count_scan.cpp @@ -329,7 +329,7 @@ public: count.saveState(); // Recover from yield - count.restoreState(&_txn); + count.restoreState(); // finish counting while (PlanStage::IS_EOF != countState) { @@ -385,7 +385,7 @@ public: remove(BSON("a" << GTE << 5)); // Recover from yield - count.restoreState(&_txn); + count.restoreState(); // finish counting while (PlanStage::IS_EOF != countState) { @@ -444,7 +444,7 @@ public: insert(BSON("a" << 6.5)); // Recover from yield - count.restoreState(&_txn); + count.restoreState(); // finish counting while (PlanStage::IS_EOF != countState) { @@ -500,7 +500,7 @@ public: insert(BSON("a" << BSON_ARRAY(10 << 11))); // Recover from yield - count.restoreState(&_txn); + count.restoreState(); // finish counting while (PlanStage::IS_EOF != countState) { @@ -623,7 +623,7 @@ public: remove(BSON("a" << 1 << "b" << 5)); // Recover from yield - count.restoreState(&_txn); + count.restoreState(); // finish counting while (PlanStage::IS_EOF != countState) { diff --git a/src/mongo/dbtests/query_stage_delete.cpp b/src/mongo/dbtests/query_stage_delete.cpp index 5c7e888e552..5aeea451a22 100644 --- a/src/mongo/dbtests/query_stage_delete.cpp +++ b/src/mongo/dbtests/query_stage_delete.cpp @@ -168,7 +168,7 @@ public: BSONObj targetDoc = coll->docFor(&_txn, locs[targetDocIndex]).value(); ASSERT(!targetDoc.isEmpty()); remove(targetDoc); - deleteStage.restoreState(&_txn); + deleteStage.restoreState(); // Remove the rest. while (!deleteStage.isEOF()) { diff --git a/src/mongo/dbtests/query_stage_ixscan.cpp b/src/mongo/dbtests/query_stage_ixscan.cpp index d6fcaf177d4..7295802d730 100644 --- a/src/mongo/dbtests/query_stage_ixscan.cpp +++ b/src/mongo/dbtests/query_stage_ixscan.cpp @@ -195,7 +195,7 @@ public: ixscan->saveState(); insert(fromjson("{_id: 4, x: 10}")); insert(fromjson("{_id: 5, x: 11}")); - ixscan->restoreState(&_txn); + ixscan->restoreState(); member = getNext(ixscan.get()); ASSERT_EQ(WorkingSetMember::LOC_AND_IDX, member->getState()); @@ -228,7 +228,7 @@ public: // Save state and insert an indexed doc. ixscan->saveState(); insert(fromjson("{_id: 4, x: 7}")); - ixscan->restoreState(&_txn); + ixscan->restoreState(); member = getNext(ixscan.get()); ASSERT_EQ(WorkingSetMember::LOC_AND_IDX, member->getState()); @@ -261,7 +261,7 @@ public: // Save state and insert an indexed doc. ixscan->saveState(); insert(fromjson("{_id: 4, x: 10}")); - ixscan->restoreState(&_txn); + ixscan->restoreState(); // Ensure that we're EOF and we don't erroneously return {'': 12}. WorkingSetID id; @@ -295,7 +295,7 @@ public: ixscan->saveState(); insert(fromjson("{_id: 4, x: 6}")); insert(fromjson("{_id: 5, x: 9}")); - ixscan->restoreState(&_txn); + ixscan->restoreState(); // Ensure that we don't erroneously return {'': 9} or {'':3}. member = getNext(ixscan.get()); diff --git a/src/mongo/dbtests/query_stage_merge_sort.cpp b/src/mongo/dbtests/query_stage_merge_sort.cpp index 55f27b3059e..e2d8f41ed39 100644 --- a/src/mongo/dbtests/query_stage_merge_sort.cpp +++ b/src/mongo/dbtests/query_stage_merge_sort.cpp @@ -587,7 +587,7 @@ public: // Invalidate locs[11]. Should force a fetch. We don't get it back. ms->saveState(); ms->invalidate(&_txn, *it, INVALIDATION_DELETION); - ms->restoreState(&_txn); + ms->restoreState(); // Make sure locs[11] was fetched for us. { diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp index b28abb5b1d5..d6f9ab81d1c 100644 --- a/src/mongo/dbtests/query_stage_sort.cpp +++ b/src/mongo/dbtests/query_stage_sort.cpp @@ -345,7 +345,7 @@ public: coll->updateDocument(&_txn, *it, oldDoc, newDoc, false, false, NULL, args); wuow.commit(); } - exec->restoreState(&_txn); + exec->restoreState(); // Read the rest of the data from the queued data stage. while (!ms->isEOF()) { @@ -364,7 +364,7 @@ public: wuow.commit(); } } - exec->restoreState(&_txn); + exec->restoreState(); // Verify that it's sorted, the right number of documents are returned, and they're all // in the expected range. @@ -444,7 +444,7 @@ public: coll->deleteDocument(&_txn, *it++, false, false, NULL); wuow.commit(); } - exec->restoreState(&_txn); + exec->restoreState(); // Read the rest of the data from the queued data stage. while (!ms->isEOF()) { @@ -461,7 +461,7 @@ public: wuow.commit(); } } - exec->restoreState(&_txn); + exec->restoreState(); // Regardless of storage engine, all the documents should come back with their objects int count = 0; diff --git a/src/mongo/dbtests/query_stage_update.cpp b/src/mongo/dbtests/query_stage_update.cpp index 3290e40ad6c..330acf33b8f 100644 --- a/src/mongo/dbtests/query_stage_update.cpp +++ b/src/mongo/dbtests/query_stage_update.cpp @@ -318,7 +318,7 @@ public: BSONObj targetDoc = coll->docFor(&_txn, locs[targetDocIndex]).value(); ASSERT(!targetDoc.isEmpty()); remove(targetDoc); - updateStage->restoreState(&_txn); + updateStage->restoreState(); // Do the remaining updates. while (!updateStage->isEOF()) { |