diff options
author | Mathias Stearn <mathias@10gen.com> | 2015-05-18 18:01:38 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2015-06-09 16:33:22 -0400 |
commit | 3b731debe162706cbbfabd9578bbb57ab5a7a7d8 (patch) | |
tree | 7b26f5541e5de0060bf75f5563b37cae5a246ee8 /src | |
parent | f50d1d0b7df924926855badd3cd700653f75f0f8 (diff) | |
download | mongo-3b731debe162706cbbfabd9578bbb57ab5a7a7d8.tar.gz |
SERVER-16444 New API for navigating RecordStores
Diffstat (limited to 'src')
67 files changed, 1366 insertions, 1573 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index cd044530418..4e2c2155254 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -203,19 +203,17 @@ namespace { return true; } - RecordIterator* Collection::getIterator( OperationContext* txn, - const RecordId& start, - const CollectionScanParams::Direction& dir) const { + std::unique_ptr<RecordCursor> Collection::getCursor(OperationContext* txn, bool forward) const { dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); invariant( ok() ); - return _recordStore->getIterator( txn, start, dir ); + return _recordStore->getCursor(txn, forward); } - vector<RecordIterator*> Collection::getManyIterators( OperationContext* txn ) const { + vector<std::unique_ptr<RecordCursor>> Collection::getManyCursors(OperationContext* txn) const { dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); - return _recordStore->getManyIterators(txn); + return _recordStore->getManyCursors(txn); } Snapshotted<BSONObj> Collection::docFor(OperationContext* txn, const RecordId& loc) const { @@ -371,12 +369,6 @@ namespace { return loc; } - RecordFetcher* Collection::documentNeedsFetch( OperationContext* txn, - const RecordId& loc ) const { - return _recordStore->recordNeedsFetch( txn, loc ); - } - - StatusWith<RecordId> Collection::_insertDocument( OperationContext* txn, const BSONObj& docToInsert, bool enforceQuota ) { diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index b7b3dfd8ed9..5f5609bd437 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -61,8 +61,8 @@ namespace mongo { class MultiIndexBlock; class OpDebug; class OperationContext; + class RecordCursor; class RecordFetcher; - class RecordIterator; class UpdateDriver; class UpdateRequest; @@ -184,20 +184,13 @@ namespace mongo { */ bool findDoc(OperationContext* txn, const RecordId& loc, Snapshotted<BSONObj>* out) const; - // ---- things that should move to a CollectionAccessMethod like thing - /** - * Default arguments will return all items in the collection. - */ - RecordIterator* getIterator( OperationContext* txn, - const RecordId& start = RecordId(), - const CollectionScanParams::Direction& dir = CollectionScanParams::FORWARD ) const; + std::unique_ptr<RecordCursor> getCursor(OperationContext* txn, bool forward = true) const; /** - * Returns many iterators that partition the Collection into many disjoint sets. Iterating - * all returned iterators is equivalent to Iterating the full collection. - * Caller owns all pointers in the vector. + * Returns many cursors that partition the Collection into many disjoint sets. Iterating + * all returned cursors is equivalent to iterating the full collection. */ - std::vector<RecordIterator*> getManyIterators( OperationContext* txn ) const; + std::vector<std::unique_ptr<RecordCursor>> getManyCursors(OperationContext* txn) const; void deleteDocument( OperationContext* txn, const RecordId& loc, @@ -230,18 +223,6 @@ namespace mongo { bool enforceQuota ); /** - * If the document at 'loc' is unlikely to be in physical memory, the storage - * engine gives us back a RecordFetcher functor which we can invoke in order - * to page fault on that record. - * - * Returns NULL if the document does not need to be fetched. - * - * Caller takes ownership of the returned RecordFetcher*. - */ - RecordFetcher* documentNeedsFetch( OperationContext* txn, - const RecordId& loc ) const; - - /** * updates the document @ oldLocation with newDoc * if the document fits in the old space, it is put there * if not, it is moved diff --git a/src/mongo/db/catalog/index_catalog.cpp b/src/mongo/db/catalog/index_catalog.cpp index 63214941bc4..9e20c50656c 100644 --- a/src/mongo/db/catalog/index_catalog.cpp +++ b/src/mongo/db/catalog/index_catalog.cpp @@ -277,9 +277,9 @@ namespace { // which allows creation of indexes using new plugins. RecordStore* indexes = dbce->getRecordStore(dbce->name() + ".system.indexes"); - boost::scoped_ptr<RecordIterator> it(indexes->getIterator(txn)); - while (!it->isEOF()) { - const BSONObj index = it->dataFor(it->getNext()).toBson(); + auto cursor = indexes->getCursor(txn); + while (auto record = cursor->next()) { + const BSONObj index = record->data.releaseToBson(); const BSONObj key = index.getObjectField("key"); const string plugin = IndexNames::findPluginName(key); if ( IndexNames::existedBefore24(plugin) ) diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index 361500933f5..ebff993ab42 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -204,18 +204,18 @@ namespace { { // Copy over all the data from source collection to target collection. - boost::scoped_ptr<RecordIterator> sourceIt(sourceColl->getIterator(txn)); - while (!sourceIt->isEOF()) { + auto cursor = sourceColl->getCursor(txn); + while (auto record = cursor->next()) { txn->checkForInterrupt(); - const Snapshotted<BSONObj> obj = sourceColl->docFor(txn, sourceIt->getNext()); + const auto obj = record->data.releaseToBson(); WriteUnitOfWork wunit(txn); // No logOp necessary because the entire renameCollection command is one logOp. bool shouldReplicateWrites = txn->writesAreReplicated(); txn->setReplicatedWrites(false); Status status = - targetColl->insertDocument(txn, obj.value(), &indexer, true).getStatus(); + targetColl->insertDocument(txn, obj, &indexer, true).getStatus(); txn->setReplicatedWrites(shouldReplicateWrites); if (!status.isOK()) return status; diff --git a/src/mongo/db/commands/parallel_collection_scan.cpp b/src/mongo/db/commands/parallel_collection_scan.cpp index 618e3306015..08b760825d6 100644 --- a/src/mongo/db/commands/parallel_collection_scan.cpp +++ b/src/mongo/db/commands/parallel_collection_scan.cpp @@ -100,8 +100,7 @@ namespace mongo { "numCursors has to be between 1 and 10000" << " was: " << numCursors ) ); - OwnedPointerVector<RecordIterator> iterators(collection->getManyIterators(txn)); - + auto iterators = collection->getManyCursors(txn); if (iterators.size() < numCursors) { numCursors = iterators.size(); } @@ -135,9 +134,9 @@ namespace mongo { MultiIteratorStage* mis = static_cast<MultiIteratorStage*>(theExec->getRootStage()); // This wasn't called above as they weren't assigned yet - iterators[i]->saveState(); + iterators[i]->savePositioned(); - mis->addIterator(iterators.releaseAt(i)); + mis->addIterator(std::move(iterators[i])); } { diff --git a/src/mongo/db/commands/repair_cursor.cpp b/src/mongo/db/commands/repair_cursor.cpp index 966b196d64e..8828bdcdae0 100644 --- a/src/mongo/db/commands/repair_cursor.cpp +++ b/src/mongo/db/commands/repair_cursor.cpp @@ -80,9 +80,8 @@ namespace mongo { "ns does not exist: " + ns.ns())); } - std::auto_ptr<RecordIterator> iter( - collection->getRecordStore()->getIteratorForRepair(txn)); - if (iter.get() == NULL) { + auto cursor = collection->getRecordStore()->getCursorForRepair(txn); + if (!cursor) { return appendCommandStatus(result, Status(ErrorCodes::CommandNotSupported, "repair iterator not supported")); @@ -91,7 +90,7 @@ namespace mongo { std::auto_ptr<WorkingSet> ws(new WorkingSet()); std::auto_ptr<MultiIteratorStage> stage(new MultiIteratorStage(txn, ws.get(), collection)); - stage->addIterator(iter.release()); + stage->addIterator(std::move(cursor)); PlanExecutor* rawExec; Status execStatus = PlanExecutor::make(txn, diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index ad89c8c1bda..fb8bdf4a311 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -85,34 +85,28 @@ namespace mongo { return PlanStage::DEAD; } - // Do some init if we haven't already. - if (NULL == _iter) { - if (_params.collection == NULL) { - _isDead = true; - Status status(ErrorCodes::InternalError, - "CollectionScan died: collection pointer was null"); - *out = WorkingSetCommon::allocateStatusMember(_workingSet, status); - return PlanStage::DEAD; - } + if ((0 != _params.maxScan) && (_specificStats.docsTested >= _params.maxScan)) { + _commonStats.isEOF = true; + } - try { - if (_lastSeenLoc.isNull()) { - _iter.reset( _params.collection->getIterator( _txn, - _params.start, - _params.direction ) ); - } - else { - invariant(_params.tailable); + if (_commonStats.isEOF) { return PlanStage::IS_EOF; } - _iter.reset( _params.collection->getIterator( _txn, - _lastSeenLoc, - _params.direction ) ); + boost::optional<Record> record; + const bool needToMakeCursor = !_cursor; + try { + if (needToMakeCursor) { + const bool forward = _params.direction == CollectionScanParams::FORWARD; + _cursor = _params.collection->getCursor(_txn, forward); - // Advance _iter past where we were last time. If it returns something else, - // mark us as dead since we want to signal an error rather than silently - // dropping data from the stream. This is related to the _lastSeenLoc handling - // in invalidate. - if (_iter->getNext() != _lastSeenLoc) { + if (!_lastSeenId.isNull()) { + invariant(_params.tailable); + // Seek to where we were last time. If it no longer exists, mark us as dead + // since we want to signal an error rather than silently dropping data from the + // stream. This is related to the _lastSeenId handling in invalidate. Note that + // we want to return the record *after* this one since we have already returned + // this one. This is only possible in the tailing case because that is the only + // time we'd need to create a cursor after already getting a record out of it. + if (!_cursor->seekExact(_lastSeenId)) { _isDead = true; Status status(ErrorCodes::InternalError, "CollectionScan died: Unexpected RecordId"); @@ -120,74 +114,57 @@ namespace mongo { return PlanStage::DEAD; } } + + _commonStats.needTime++; + return PlanStage::NEED_TIME; } - catch (const WriteConflictException& wce) { - // Leave us in a state to try again next time. - _iter.reset(); - *out = WorkingSet::INVALID_ID; - return PlanStage::NEED_YIELD; + + if (_lastSeenId.isNull() && !_params.start.isNull()) { + record = _cursor->seekExact(_params.start); } + else { + // See if the record we're about to access is in memory. If not, pass a fetch + // request up. + if (auto fetcher = _cursor->fetcherForNext()) { + // Pass the RecordFetcher up. + WorkingSetMember* member = _workingSet->get(_wsidForFetch); + member->setFetcher(fetcher.release()); + *out = _wsidForFetch; + _commonStats.needYield++; + return PlanStage::NEED_YIELD; + } - ++_commonStats.needTime; - return PlanStage::NEED_TIME; + record = _cursor->next(); + } } - - // Should we try getNext() on the underlying _iter? - if (isEOF()) { - _commonStats.isEOF = true; - return PlanStage::IS_EOF; + catch (const WriteConflictException& wce) { + // Leave us in a state to try again next time. + if (needToMakeCursor) + _cursor.reset(); + *out = WorkingSet::INVALID_ID; + return PlanStage::NEED_YIELD; } - const RecordId curr = _iter->curr(); - if (curr.isNull()) { - // We just hit EOF - if (_params.tailable) - _iter.reset(); // pick up where we left off on the next call to work() - else + if (!record) { + // We just hit EOF. If we are tailable and have already returned data, leave us in a + // state to pick up where we left off on the next call to work(). Otherwise EOF is + // permanent. + if (_params.tailable && !_lastSeenId.isNull()) { + _cursor.reset(); + } + else { _commonStats.isEOF = true; - - return PlanStage::IS_EOF; - } - - _lastSeenLoc = curr; - - // See if the record we're about to access is in memory. If not, pass a fetch request up. - // Note that curr() does not touch the record. This way, we are able to yield before - // fetching the record. - { - std::auto_ptr<RecordFetcher> fetcher( - _params.collection->documentNeedsFetch(_txn, curr)); - if (NULL != fetcher.get()) { - WorkingSetMember* member = _workingSet->get(_wsidForFetch); - member->loc = curr; - // Pass the RecordFetcher off to the WSM. - member->setFetcher(fetcher.release()); - *out = _wsidForFetch; - _commonStats.needYield++; - return PlanStage::NEED_YIELD; } + + return PlanStage::IS_EOF; } - // Do this before advancing because it is more efficient while the iterator is still on this - // document. - const Snapshotted<BSONObj> obj = Snapshotted<BSONObj>(_txn->recoveryUnit()->getSnapshotId(), - _iter->dataFor(curr).releaseToBson()); - - // Advance the iterator. - try { - invariant(_iter->getNext() == curr); - } - catch (const WriteConflictException& wce) { - // If getNext thows, it leaves us on the original document. - invariant(_iter->curr() == curr); - *out = WorkingSet::INVALID_ID; - return PlanStage::NEED_YIELD; - } + _lastSeenId = record->id; WorkingSetID id = _workingSet->allocate(); WorkingSetMember* member = _workingSet->get(id); - member->loc = curr; - member->obj = obj; + member->loc = record->id; + member->obj = {_txn->recoveryUnit()->getSnapshotId(), record->data.releaseToBson()}; member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; return returnIfMatches(member, id, out); @@ -211,17 +188,11 @@ namespace mongo { } bool CollectionScan::isEOF() { - if ((0 != _params.maxScan) && (_specificStats.docsTested >= _params.maxScan)) { - return true; - } - if (_isDead) { return true; } - if (NULL == _iter) { return false; } - if (_params.tailable) { return false; } // tailable cursors can return data later. - return _iter->isEOF(); + return _commonStats.isEOF || _isDead; } void CollectionScan::invalidate(OperationContext* txn, - const RecordId& dl, + const RecordId& id, InvalidationType type) { ++_commonStats.invalidates; @@ -231,14 +202,14 @@ namespace mongo { return; } - // If we're here, 'dl' is being deleted. + // If we're here, 'id' is being deleted. - // Deletions can harm the underlying RecordIterator so we must pass them down. - if (NULL != _iter) { - _iter->invalidate(dl); + // Deletions can harm the underlying RecordCursor so we must pass them down. + if (_cursor) { + _cursor->invalidate(id); } - if (_params.tailable && dl == _lastSeenLoc) { + if (_params.tailable && id == _lastSeenId) { // This means that deletes have caught up to the reader. We want to error in this case // so readers don't miss potentially important data. _isDead = true; @@ -248,8 +219,8 @@ namespace mongo { void CollectionScan::saveState() { _txn = NULL; ++_commonStats.yields; - if (NULL != _iter) { - _iter->saveState(); + if (_cursor) { + _cursor->savePositioned(); } } @@ -257,8 +228,8 @@ namespace mongo { invariant(_txn == NULL); _txn = opCtx; ++_commonStats.unyields; - if (NULL != _iter) { - if (!_iter->restoreState(opCtx)) { + if (_cursor) { + if (!_cursor->restore(opCtx)) { warning() << "Collection dropped or state deleted during yield of CollectionScan: " << opCtx->getNS(); _isDead = true; diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index 939b293b209..ec19c5e22ee 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -28,7 +28,7 @@ #pragma once -#include <boost/scoped_ptr.hpp> +#include <memory> #include "mongo/db/exec/collection_scan_common.h" #include "mongo/db/exec/plan_stage.h" @@ -37,7 +37,7 @@ namespace mongo { - class RecordIterator; + class RecordCursor; class WorkingSet; class OperationContext; @@ -91,13 +91,13 @@ namespace mongo { // The filter is not owned by us. const MatchExpression* _filter; - boost::scoped_ptr<RecordIterator> _iter; + std::unique_ptr<RecordCursor> _cursor; CollectionScanParams _params; bool _isDead; - RecordId _lastSeenLoc; + RecordId _lastSeenId; // Null if nothing has been returned from _cursor yet. // We allocate a working set member with this id on construction of the stage. It gets // used for all fetch requests, changing the RecordId as appropriate. diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp index 85a847f462c..d5e64607b80 100644 --- a/src/mongo/db/exec/delete.cpp +++ b/src/mongo/db/exec/delete.cpp @@ -136,8 +136,10 @@ namespace mongo { try { // If the snapshot changed, then we have to make sure we have the latest copy of the // doc and that it still matches. + std::unique_ptr<RecordCursor> cursor; if (_txn->recoveryUnit()->getSnapshotId() != member->obj.snapshotId()) { - if (!WorkingSetCommon::fetch(_txn, member, _collection)) { + cursor = _collection->getCursor(_txn); + if (!WorkingSetCommon::fetch(_txn, member, cursor)) { // Doc is already deleted. Nothing more to do. ++_commonStats.needTime; return PlanStage::NEED_TIME; diff --git a/src/mongo/db/exec/fetch.cpp b/src/mongo/db/exec/fetch.cpp index 5fd1a3c1a98..481640cc971 100644 --- a/src/mongo/db/exec/fetch.cpp +++ b/src/mongo/db/exec/fetch.cpp @@ -104,24 +104,22 @@ namespace mongo { verify(WorkingSetMember::LOC_AND_IDX == member->state); verify(member->hasLoc()); - // We might need to retrieve 'nextLoc' from secondary storage, in which case we send - // a NEED_YIELD request up to the PlanExecutor. - std::auto_ptr<RecordFetcher> fetcher(_collection->documentNeedsFetch(_txn, - member->loc)); - if (NULL != fetcher.get()) { - // There's something to fetch. Hand the fetcher off to the WSM, and pass up - // a fetch request. - _idRetrying = id; - member->setFetcher(fetcher.release()); - *out = id; - _commonStats.needYield++; - return NEED_YIELD; - } - - // The doc is already in memory, so go ahead and grab it. Now we have a RecordId - // as well as an unowned object try { - if (!WorkingSetCommon::fetch(_txn, member, _collection)) { + if (!_cursor) _cursor = _collection->getCursor(_txn); + + if (auto fetcher = _cursor->fetcherForId(member->loc)) { + // There's something to fetch. Hand the fetcher off to the WSM, and pass up + // a fetch request. + _idRetrying = id; + member->setFetcher(fetcher.release()); + *out = id; + _commonStats.needYield++; + return NEED_YIELD; + } + + // The doc is already in memory, so go ahead and grab it. Now we have a RecordId + // as well as an unowned object + if (!WorkingSetCommon::fetch(_txn, member, _cursor)) { _ws->free(id); _commonStats.needTime++; return NEED_TIME; @@ -164,6 +162,7 @@ namespace mongo { void FetchStage::saveState() { _txn = NULL; ++_commonStats.yields; + if (_cursor) _cursor->saveUnpositioned(); _child->saveState(); } @@ -171,6 +170,7 @@ namespace mongo { invariant(_txn == NULL); _txn = opCtx; ++_commonStats.unyields; + if (_cursor) _cursor->restore(opCtx); _child->restoreState(opCtx); } diff --git a/src/mongo/db/exec/fetch.h b/src/mongo/db/exec/fetch.h index 18fbeedb3db..b43a38bb7eb 100644 --- a/src/mongo/db/exec/fetch.h +++ b/src/mongo/db/exec/fetch.h @@ -28,7 +28,7 @@ #pragma once -#include <boost/scoped_ptr.hpp> +#include <memory> #include "mongo/db/exec/plan_stage.h" #include "mongo/db/jsobj.h" @@ -37,6 +37,8 @@ namespace mongo { + class RecordCursor; + /** * This stage turns a RecordId into a BSONObj. * @@ -88,10 +90,12 @@ namespace mongo { // Collection which is used by this stage. Used to resolve record ids retrieved by child // stages. The lifetime of the collection must supersede that of the stage. const Collection* _collection; + // Used to fetch Records from _collection. + std::unique_ptr<RecordCursor> _cursor; // _ws is not owned by us. WorkingSet* _ws; - boost::scoped_ptr<PlanStage> _child; + std::unique_ptr<PlanStage> _child; // The filter is not owned by us. const MatchExpression* _filter; diff --git a/src/mongo/db/exec/idhack.cpp b/src/mongo/db/exec/idhack.cpp index 26de70dc7aa..149d20e3509 100644 --- a/src/mongo/db/exec/idhack.cpp +++ b/src/mongo/db/exec/idhack.cpp @@ -97,11 +97,12 @@ namespace mongo { if (_done) { return PlanStage::IS_EOF; } if (WorkingSet::INVALID_ID != _idBeingPagedIn) { + invariant(_recordCursor); WorkingSetID id = _idBeingPagedIn; _idBeingPagedIn = WorkingSet::INVALID_ID; WorkingSetMember* member = _workingSet->get(id); - invariant(WorkingSetCommon::fetchIfUnfetched(_txn, member, _collection)); + invariant(WorkingSetCommon::fetchIfUnfetched(_txn, member, _recordCursor)); return advance(id, member, out); } @@ -136,9 +137,10 @@ namespace mongo { member->state = WorkingSetMember::LOC_AND_IDX; member->loc = loc; + if (!_recordCursor) _recordCursor = _collection->getCursor(_txn); + // We may need to request a yield while we fetch the document. - std::auto_ptr<RecordFetcher> fetcher(_collection->documentNeedsFetch(_txn, loc)); - if (NULL != fetcher.get()) { + if (auto fetcher = _recordCursor->fetcherForId(loc)) { // There's something to fetch. Hand the fetcher off to the WSM, and pass up a // fetch request. _idBeingPagedIn = id; @@ -149,7 +151,7 @@ namespace mongo { } // The doc was already in memory, so we go ahead and return it. - if (!WorkingSetCommon::fetch(_txn, member, _collection)) { + if (!WorkingSetCommon::fetch(_txn, member, _recordCursor)) { // _id is immutable so the index would return the only record that could // possibly match the query. _workingSet->free(id); @@ -162,6 +164,7 @@ namespace mongo { } catch (const WriteConflictException& wce) { // Restart at the beginning on retry. + _recordCursor.reset(); if (id != WorkingSet::INVALID_ID) _workingSet->free(id); @@ -192,12 +195,14 @@ namespace mongo { void IDHackStage::saveState() { _txn = NULL; ++_commonStats.yields; + if (_recordCursor) _recordCursor->saveUnpositioned(); } void IDHackStage::restoreState(OperationContext* opCtx) { invariant(_txn == NULL); _txn = opCtx; ++_commonStats.unyields; + if (_recordCursor) _recordCursor->restore(opCtx); } void IDHackStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { diff --git a/src/mongo/db/exec/idhack.h b/src/mongo/db/exec/idhack.h index d1f1020342d..5430addd12b 100644 --- a/src/mongo/db/exec/idhack.h +++ b/src/mongo/db/exec/idhack.h @@ -28,6 +28,8 @@ #pragma once +#include <memory> + #include "mongo/db/catalog/collection.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/query/canonical_query.h" @@ -35,6 +37,8 @@ namespace mongo { + class RecordCursor; + /** * A standalone stage implementing the fast path for key-value retrievals * via the _id index. @@ -88,6 +92,8 @@ namespace mongo { // Not owned here. const Collection* _collection; + std::unique_ptr<RecordCursor> _recordCursor; + // The WorkingSet we annotate with results. Not owned by us. WorkingSet* _workingSet; diff --git a/src/mongo/db/exec/multi_iterator.cpp b/src/mongo/db/exec/multi_iterator.cpp index 139b1a5834c..f8aeaac8ca5 100644 --- a/src/mongo/db/exec/multi_iterator.cpp +++ b/src/mongo/db/exec/multi_iterator.cpp @@ -54,8 +54,8 @@ namespace mongo { member->state = WorkingSetMember::LOC_AND_OWNED_OBJ; } - void MultiIteratorStage::addIterator(RecordIterator* it) { - _iterators.push_back(it); + void MultiIteratorStage::addIterator(std::unique_ptr<RecordCursor> it) { + _iterators.push_back(std::move(it)); } PlanStage::StageState MultiIteratorStage::work(WorkingSetID* out) { @@ -66,47 +66,36 @@ namespace mongo { return PlanStage::DEAD; } - if (_iterators.empty()) - return PlanStage::IS_EOF; - - const RecordId curr = _iterators.back()->curr(); - Snapshotted<BSONObj> obj; - - // The RecordId we're about to look at it might not be in memory. In this case - // we request a yield while we fetch the document. - if (!curr.isNull()) { - std::auto_ptr<RecordFetcher> fetcher(_collection->documentNeedsFetch(_txn, curr)); - if (NULL != fetcher.get()) { - WorkingSetMember* member = _ws->get(_wsidForFetch); - member->loc = curr; - // Pass the RecordFetcher off to the WSM on which we're performing the fetch. - member->setFetcher(fetcher.release()); - *out = _wsidForFetch; - return NEED_YIELD; - } - - obj = Snapshotted<BSONObj>(_txn->recoveryUnit()->getSnapshotId(), - _iterators.back()->dataFor(curr).releaseToBson()); - } - + boost::optional<Record> record; try { - _advance(); + while (!_iterators.empty()) { + if (auto fetcher = _iterators.back()->fetcherForNext()) { + // Pass the RecordFetcher off up. + WorkingSetMember* member = _ws->get(_wsidForFetch); + member->setFetcher(fetcher.release()); + *out = _wsidForFetch; + return NEED_YIELD; + } + + record = _iterators.back()->next(); + if (record) break; + _iterators.pop_back(); + } } catch (const WriteConflictException& wce) { // If _advance throws a WCE we shouldn't have moved. invariant(!_iterators.empty()); - invariant(_iterators.back()->curr() == curr); *out = WorkingSet::INVALID_ID; return NEED_YIELD; } - if (curr.isNull()) - return NEED_TIME; + if (!record) + return IS_EOF; *out = _ws->allocate(); WorkingSetMember* member = _ws->get(*out); - member->loc = curr; - member->obj = obj; + member->loc = record->id; + member->obj = {_txn->recoveryUnit()->getSnapshotId(), record->data.releaseToBson()}; member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; return PlanStage::ADVANCED; } @@ -123,7 +112,7 @@ namespace mongo { void MultiIteratorStage::saveState() { _txn = NULL; for (size_t i = 0; i < _iterators.size(); i++) { - _iterators[i]->saveState(); + _iterators[i]->savePositioned(); } } @@ -131,7 +120,7 @@ namespace mongo { invariant(_txn == NULL); _txn = opCtx; for (size_t i = 0; i < _iterators.size(); i++) { - if (!_iterators[i]->restoreState(opCtx)) { + if (!_iterators[i]->restore(opCtx)) { kill(); } } @@ -164,13 +153,4 @@ namespace mongo { return ret.release(); } - void MultiIteratorStage::_advance() { - if (_iterators.back()->isEOF()) { - _iterators.popAndDeleteBack(); - } - else { - _iterators.back()->getNext(); - } - } - } // namespace mongo diff --git a/src/mongo/db/exec/multi_iterator.h b/src/mongo/db/exec/multi_iterator.h index cc2256815dd..ac2cf44b007 100644 --- a/src/mongo/db/exec/multi_iterator.h +++ b/src/mongo/db/exec/multi_iterator.h @@ -28,6 +28,9 @@ #pragma once +#include <memory> +#include <vector> + #include "mongo/db/catalog/collection.h" #include "mongo/db/exec/plan_stage.h" #include "mongo/db/exec/plan_stats.h" @@ -36,10 +39,10 @@ namespace mongo { /** - * Iterates over a collection using multiple underlying RecordIterators. + * Iterates over a collection using multiple underlying RecordCursors. * * This is a special stage which is not used automatically by queries. It is intended for - * special commands that work with RecordIterators. For example, it is used by the + * special commands that work with RecordCursors. For example, it is used by the * parallelCollectionScan and repairCursor commands */ class MultiIteratorStage : public PlanStage { @@ -48,10 +51,7 @@ namespace mongo { ~MultiIteratorStage() { } - /** - * Takes ownership of 'it'. - */ - void addIterator(RecordIterator* it); + void addIterator(std::unique_ptr<RecordCursor> it); virtual PlanStage::StageState work(WorkingSetID* out); @@ -82,12 +82,9 @@ namespace mongo { static const char* kStageType; private: - - void _advance(); - OperationContext* _txn; Collection* _collection; - OwnedPointerVector<RecordIterator> _iterators; + std::vector<std::unique_ptr<RecordCursor>> _iterators; // Not owned by us. WorkingSet* _ws; diff --git a/src/mongo/db/exec/oplogstart.cpp b/src/mongo/db/exec/oplogstart.cpp index 76ec95021d9..92de52db505 100644 --- a/src/mongo/db/exec/oplogstart.cpp +++ b/src/mongo/db/exec/oplogstart.cpp @@ -98,29 +98,34 @@ namespace mongo { } // we work from the back to the front since the back has the newest data. - const RecordId loc = _subIterators.back()->curr(); - if (loc.isNull()) return PlanStage::NEED_TIME; - - // TODO: should we ever try and return NEED_YIELD here? - const BSONObj obj = _subIterators.back()->dataFor(loc).releaseToBson(); - if (!_filter->matchesBSON(obj)) { - _done = true; - WorkingSetID id = _workingSet->allocate(); - WorkingSetMember* member = _workingSet->get(id); - member->loc = loc; - member->obj = Snapshotted<BSONObj>(_txn->recoveryUnit()->getSnapshotId(), obj); - member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; - *out = id; - return PlanStage::ADVANCED; + try { + // TODO: should we ever check fetcherForNext()? + if (auto record = _subIterators.back()->next()) { + BSONObj obj = record->data.releaseToBson(); + if (!_filter->matchesBSON(obj)) { + _done = true; + WorkingSetID id = _workingSet->allocate(); + WorkingSetMember* member = _workingSet->get(id); + member->loc = record->id; + member->obj = {_txn->recoveryUnit()->getSnapshotId(), std::move(obj)}; + member->state = WorkingSetMember::LOC_AND_UNOWNED_OBJ; + *out = id; + return PlanStage::ADVANCED; + } + } + } + catch (const WriteConflictException& wce) { + *out = WorkingSet::INVALID_ID; + return PlanStage::NEED_YIELD; } - _subIterators.popAndDeleteBack(); + _subIterators.pop_back(); return PlanStage::NEED_TIME; } void OplogStart::switchToExtentHopping() { // Set up our extent hopping state. - _subIterators = _collection->getManyIterators(_txn); + _subIterators = _collection->getManyCursors(_txn); // Transition from backwards scanning to extent hopping. _backwardsScanning = false; @@ -179,7 +184,7 @@ namespace mongo { } for (size_t i = 0; i < _subIterators.size(); i++) { - _subIterators[i]->saveState(); + _subIterators[i]->savePositioned(); } } @@ -191,7 +196,7 @@ namespace mongo { } for (size_t i = 0; i < _subIterators.size(); i++) { - if (!_subIterators[i]->restoreState(opCtx)) { + if (!_subIterators[i]->restore(opCtx)) { _subIterators.erase(_subIterators.begin() + i); // need to hit same i on next pass through loop i--; diff --git a/src/mongo/db/exec/oplogstart.h b/src/mongo/db/exec/oplogstart.h index a3592b58f6b..47850eafef8 100644 --- a/src/mongo/db/exec/oplogstart.h +++ b/src/mongo/db/exec/oplogstart.h @@ -110,8 +110,7 @@ namespace mongo { boost::scoped_ptr<CollectionScan> _cs; // This is only used for the extent hopping scan. - typedef OwnedPointerVector<RecordIterator> SubIterators; - SubIterators _subIterators; + std::vector<std::unique_ptr<RecordCursor>> _subIterators; // Have we done our heavy init yet? bool _needInit; diff --git a/src/mongo/db/exec/text.cpp b/src/mongo/db/exec/text.cpp index b049c17b9ff..5a62fc9937d 100644 --- a/src/mongo/db/exec/text.cpp +++ b/src/mongo/db/exec/text.cpp @@ -133,6 +133,8 @@ namespace mongo { for (size_t i = 0; i < _scanners.size(); ++i) { _scanners.mutableVector()[i]->saveState(); } + + if (_recordCursor) _recordCursor->saveUnpositioned(); } void TextStage::restoreState(OperationContext* opCtx) { @@ -143,6 +145,8 @@ namespace mongo { for (size_t i = 0; i < _scanners.size(); ++i) { _scanners.mutableVector()[i]->restoreState(opCtx); } + + if (_recordCursor) invariant(_recordCursor->restore(opCtx)); } void TextStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { @@ -196,6 +200,8 @@ namespace mongo { PlanStage::StageState TextStage::initScans(WorkingSetID* out) { invariant(0 == _scanners.size()); + _recordCursor = _params.index->getCollection()->getCursor(_txn); + _specificStats.parsedTextQuery = _params.query.toBSON(); // Get all the index scans for each term in our query. @@ -304,7 +310,7 @@ namespace mongo { WorkingSetMember* wsm = _ws->get(textRecordData.wsid); try { - if (!WorkingSetCommon::fetchIfUnfetched(_txn, wsm, _params.index->getCollection())) { + if (!WorkingSetCommon::fetchIfUnfetched(_txn, wsm, _recordCursor)) { _scoreIterator++; _ws->free(textRecordData.wsid); _commonStats.needTime++; @@ -338,9 +344,9 @@ namespace mongo { const BSONObj& keyPattern, const BSONObj& key, WorkingSetMember* wsm, - const Collection* collection) + unowned_ptr<RecordCursor> recordCursor) : _txn(txn), - _collection(collection), + _recordCursor(recordCursor), _keyPattern(keyPattern), _key(key), _wsm(wsm) { } @@ -384,14 +390,16 @@ namespace mongo { private: BSONObj getObj() const { - if (!WorkingSetCommon::fetchIfUnfetched(_txn, _wsm, _collection)) + if (!WorkingSetCommon::fetchIfUnfetched(_txn, _wsm, _recordCursor)) throw DocumentDeletedException(); + // Make it owned since we are buffering results. + _wsm->obj.setValue(_wsm->obj.value().getOwned()); return _wsm->obj.value(); } OperationContext* _txn; - const Collection* _collection; + unowned_ptr<RecordCursor> _recordCursor; BSONObj _keyPattern; BSONObj _key; WorkingSetMember* _wsm; @@ -420,7 +428,7 @@ namespace mongo { newKeyData.indexKeyPattern, newKeyData.keyData, wsm, - _params.index->getCollection()); + _recordCursor); shouldKeep = _filter->matches(&tdoc); } catch (const WriteConflictException& wce) { diff --git a/src/mongo/db/exec/text.h b/src/mongo/db/exec/text.h index 4a7364e052d..96f5c67bc4d 100644 --- a/src/mongo/db/exec/text.h +++ b/src/mongo/db/exec/text.h @@ -194,6 +194,9 @@ namespace mongo { typedef unordered_map<RecordId, TextRecordData, RecordId::Hasher> ScoreMap; ScoreMap _scores; ScoreMap::const_iterator _scoreIterator; + + // Used for fetching records from the collection. + std::unique_ptr<RecordCursor> _recordCursor; }; } // namespace mongo diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index 6eb9460c2b1..80e1816aa51 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -869,9 +869,11 @@ namespace mongo { } try { + std::unique_ptr<RecordCursor> cursor; if (_txn->recoveryUnit()->getSnapshotId() != member->obj.snapshotId()) { + cursor = _collection->getCursor(_txn); // our snapshot has changed, refetch - if (!WorkingSetCommon::fetch(_txn, member, _collection)) { + if (!WorkingSetCommon::fetch(_txn, member, cursor)) { // document was deleted, we're done here ++_commonStats.needTime; return PlanStage::NEED_TIME; diff --git a/src/mongo/db/exec/working_set_common.cpp b/src/mongo/db/exec/working_set_common.cpp index 16f1b25bc95..6f03d1378a1 100644 --- a/src/mongo/db/exec/working_set_common.cpp +++ b/src/mongo/db/exec/working_set_common.cpp @@ -76,7 +76,7 @@ namespace mongo { // static bool WorkingSetCommon::fetch(OperationContext* txn, WorkingSetMember* member, - const Collection* collection) { + unowned_ptr<RecordCursor> cursor) { // The RecordFetcher should already have been transferred out of the WSM and used. invariant(!member->hasFetcher()); @@ -85,10 +85,13 @@ namespace mongo { invariant(member->hasLoc()); member->obj.reset(); - if (!collection->findDoc(txn, member->loc, &member->obj)) { + auto record = cursor->seekExact(member->loc); + if (!record) { return false; } + member->obj = {txn->recoveryUnit()->getSnapshotId(), record->data.releaseToBson()}; + if (member->isSuspicious) { // Make sure that all of the keyData is still valid for this copy of the document. // This ensures both that index-provided filters and sort orders still hold. diff --git a/src/mongo/db/exec/working_set_common.h b/src/mongo/db/exec/working_set_common.h index bc4ceda2da0..aa1ecdb96c6 100644 --- a/src/mongo/db/exec/working_set_common.h +++ b/src/mongo/db/exec/working_set_common.h @@ -29,12 +29,14 @@ #pragma once #include "mongo/db/exec/working_set.h" +#include "mongo/util/unowned_ptr.h" namespace mongo { class CanonicalQuery; class Collection; class OperationContext; + class RecordCursor; class WorkingSetCommon { public: @@ -68,13 +70,13 @@ namespace mongo { */ static bool fetch(OperationContext* txn, WorkingSetMember* member, - const Collection* collection); + unowned_ptr<RecordCursor> cursor); static bool fetchIfUnfetched(OperationContext* txn, WorkingSetMember* member, - const Collection* collection) { + unowned_ptr<RecordCursor> cursor) { if (member->hasObj()) return true; - return fetch(txn, member, collection); + return fetch(txn, member, cursor); } /** diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp index 9bbb8da24de..fccf3ef21e3 100644 --- a/src/mongo/db/repair_database.cpp +++ b/src/mongo/db/repair_database.cpp @@ -127,22 +127,21 @@ namespace { long long dataSize = 0; RecordStore* rs = collection->getRecordStore(); - boost::scoped_ptr<RecordIterator> it(rs->getIterator(txn)); - while (!it->isEOF()) { - RecordId id = it->curr(); - RecordData data = it->dataFor(id); - invariant(id == it->getNext()); + auto cursor = rs->getCursor(txn); + while (auto record = cursor->next()) { + RecordId id = record->id; + RecordData& data = record->data; Status status = validateBSON(data.data(), data.size()); if (!status.isOK()) { log() << "Invalid BSON detected at " << id << ": " << status << ". Deleting."; - it->saveState(); + cursor->savePositioned(); // 'data' is no longer valid. { WriteUnitOfWork wunit(txn); rs->deleteRecord(txn, id); wunit.commit(); } - it->restoreState(txn); + cursor->restore(txn); continue; } diff --git a/src/mongo/db/storage/README.md b/src/mongo/db/storage/README.md index 16f48f5f0ab..874900547f3 100644 --- a/src/mongo/db/storage/README.md +++ b/src/mongo/db/storage/README.md @@ -107,7 +107,7 @@ details. * KVEngine * RecordStore -* RecordIterator +* RecordCursor * RecoveryUnit * SortedDataInterface * ServerStatusSection diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp index 9ef68a68dbf..1d1d039b7a6 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp @@ -34,20 +34,16 @@ #include "mongo/db/storage/in_memory/in_memory_record_store.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" +#include "mongo/stdx/memory.h" namespace mongo { - class EmptyRecordIterator: public RecordIterator { + class EmptyRecordCursor final : public RecordCursor { public: - virtual bool isEOF() { return true; } - virtual RecordId curr() { return RecordId(); } - virtual RecordId getNext() { return RecordId(); } - virtual void invalidate(const RecordId& dl) { } - virtual void saveState() { } - virtual bool restoreState(OperationContext* txn) { return false; } - virtual RecordData dataFor( const RecordId& loc ) const { - invariant( false ); - } + boost::optional<Record> next() final { return {}; } + boost::optional<Record> seekExact(const RecordId& id) final { return {}; } + void savePositioned() final {} + bool restore(OperationContext* txn) final { return true; } }; class DevNullRecordStore : public RecordStore { @@ -120,20 +116,9 @@ namespace mongo { invariant(false); } - virtual RecordIterator* getIterator( OperationContext* txn, - const RecordId& start, - const CollectionScanParams::Direction& dir ) const { - return new EmptyRecordIterator(); - } - - virtual RecordIterator* getIteratorForRepair( OperationContext* txn ) const { - return new EmptyRecordIterator(); - } - virtual std::vector<RecordIterator*> getManyIterators( OperationContext* txn ) const { - std::vector<RecordIterator*> v; - v.push_back( new EmptyRecordIterator() ); - return v; + std::unique_ptr<RecordCursor> getCursor(OperationContext* txn, bool forward) const final { + return stdx::make_unique<EmptyRecordCursor>(); } virtual Status truncate( OperationContext* txn ) { return Status::OK(); } diff --git a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp index 841888cb051..92deaf82810 100644 --- a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp +++ b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp @@ -36,12 +36,14 @@ #include <boost/shared_ptr.hpp> #include "mongo/db/jsobj.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/storage/oplog_hack.h" #include "mongo/db/storage/recovery_unit.h" +#include "mongo/stdx/memory.h" #include "mongo/util/log.h" -#include "mongo/db/namespace_string.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/unowned_ptr.h" namespace mongo { @@ -109,6 +111,152 @@ namespace mongo { Records _records; }; + class InMemoryRecordStore::Cursor final : public RecordCursor { + public: + Cursor(OperationContext* txn, const InMemoryRecordStore& rs) + : _txn(txn) + , _records(rs._data->records) + , _isCapped(rs.isCapped()) + {} + + boost::optional<Record> next() final { + if (_needFirstSeek) { + _needFirstSeek = false; + _it = _records.begin(); + } + else if (!_lastMoveWasRestore && _it != _records.end()) { + ++_it; + } + _lastMoveWasRestore = false; + + if (_it == _records.end()) return {}; + return {{_it->first, _it->second.toRecordData()}}; + } + + boost::optional<Record> seekExact(const RecordId& id) final { + _lastMoveWasRestore = false; + _needFirstSeek = false; + _it = _records.find(id); + if (_it == _records.end()) return {}; + return {{_it->first, _it->second.toRecordData()}}; + } + + void savePositioned() final { + _txn = nullptr; + if (!_needFirstSeek && !_lastMoveWasRestore) + _savedId = _it == _records.end() ? RecordId() : _it->first; + } + + void saveUnpositioned() final { + _txn = nullptr; + _savedId = RecordId(); + } + + bool restore(OperationContext* txn) final { + _txn = txn; + if (_savedId.isNull()) { + _it = _records.end(); + return true; + } + + _it = _records.lower_bound(_savedId); + _lastMoveWasRestore = _it == _records.end() || _it->first != _savedId; + + // Capped iterators die on invalidation rather than advancing. + return !(_isCapped && _lastMoveWasRestore); + } + + private: + unowned_ptr<OperationContext> _txn; + Records::const_iterator _it; + bool _needFirstSeek = true; + bool _lastMoveWasRestore = false; + RecordId _savedId; // Location to restore() to. Null means EOF. + + const InMemoryRecordStore::Records& _records; + const bool _isCapped; + }; + + class InMemoryRecordStore::ReverseCursor final : public RecordCursor { + public: + ReverseCursor(OperationContext* txn, const InMemoryRecordStore& rs) + : _txn(txn) + , _records(rs._data->records) + , _isCapped(rs.isCapped()) + {} + + boost::optional<Record> next() final { + if (_needFirstSeek) { + _needFirstSeek = false; + _it = _records.rbegin(); + } + else if (!_lastMoveWasRestore && _it != _records.rend()) { + ++_it; + } + _lastMoveWasRestore = false; + + if (_it == _records.rend()) return {}; + return {{_it->first, _it->second.toRecordData()}}; + } + + boost::optional<Record> seekExact(const RecordId& id) final { + _lastMoveWasRestore = false; + _needFirstSeek = false; + + auto forwardIt = _records.find(id); + if (forwardIt == _records.end()) { + _it = _records.rend(); + return {}; + } + + // The reverse_iterator will point to the preceding element, so increment the base + // iterator to make it point past the found element. + ++forwardIt; + _it = Records::const_reverse_iterator(forwardIt); + dassert(_it != _records.rend()); + dassert(_it->first == id); + return {{_it->first, _it->second.toRecordData()}}; + } + + void savePositioned() final { + _txn = nullptr; + if (!_needFirstSeek && !_lastMoveWasRestore) + _savedId = _it == _records.rend() ? RecordId() : _it->first; + } + + void saveUnpositioned() final { + _txn = nullptr; + _savedId = RecordId(); + } + + bool restore(OperationContext* txn) final { + _txn = txn; + if (_savedId.isNull()) { + _it = _records.rend(); + return true; + } + + // Note: upper_bound returns the first entry > _savedId and reverse_iterators + // dereference to the element before their base iterator. This combine to make this + // dereference to the first element <= _savedId which is what we want here. + _it = Records::const_reverse_iterator(_records.upper_bound(_savedId)); + _lastMoveWasRestore = _it == _records.rend() || _it->first != _savedId; + + // Capped iterators die on invalidation rather than advancing. + return !(_isCapped && _lastMoveWasRestore); + } + + private: + unowned_ptr<OperationContext> _txn; + Records::const_reverse_iterator _it; + bool _needFirstSeek = true; + bool _lastMoveWasRestore = false; + RecordId _savedId; // Location to restore() to. Null means EOF. + const InMemoryRecordStore::Records& _records; + const bool _isCapped; + }; + + // // RecordStore // @@ -369,30 +517,11 @@ namespace mongo { return Status::OK(); } - RecordIterator* InMemoryRecordStore::getIterator( - OperationContext* txn, - const RecordId& start, - const CollectionScanParams::Direction& dir) const { + std::unique_ptr<RecordCursor> InMemoryRecordStore::getCursor(OperationContext* txn, + bool forward) const { - if (dir == CollectionScanParams::FORWARD) { - return new InMemoryRecordIterator(txn, _data->records, *this, start, false); - } - else { - return new InMemoryRecordReverseIterator(txn, _data->records, *this, start); - } - } - - RecordIterator* InMemoryRecordStore::getIteratorForRepair(OperationContext* txn) const { - // TODO maybe make different from InMemoryRecordIterator - return new InMemoryRecordIterator(txn, _data->records, *this); - } - - std::vector<RecordIterator*> InMemoryRecordStore::getManyIterators( - OperationContext* txn) const { - std::vector<RecordIterator*> out; - // TODO maybe find a way to return multiple iterators. - out.push_back(new InMemoryRecordIterator(txn, _data->records, *this)); - return out; + if (forward) return stdx::make_unique<Cursor>(txn, *this); + return stdx::make_unique<ReverseCursor>(txn, *this); } Status InMemoryRecordStore::truncate(OperationContext* txn) { @@ -498,180 +627,4 @@ namespace mongo { return it->first; } - // - // Forward Iterator - // - - InMemoryRecordIterator::InMemoryRecordIterator(OperationContext* txn, - const InMemoryRecordStore::Records& records, - const InMemoryRecordStore& rs, - RecordId start, - bool tailable) - : _txn(txn), - _tailable(tailable), - _lastLoc(RecordId::min()), - _killedByInvalidate(false), - _records(records), - _rs(rs) { - if (start.isNull()) { - _it = _records.begin(); - } - else { - _it = _records.find(start); - invariant(_it != _records.end()); - } - } - - bool InMemoryRecordIterator::isEOF() { - return _it == _records.end(); - } - - RecordId InMemoryRecordIterator::curr() { - if (isEOF()) - return RecordId(); - return _it->first; - } - - RecordId InMemoryRecordIterator::getNext() { - if (isEOF()) { - if (!_tailable) - return RecordId(); - - if (_records.empty()) - return RecordId(); - - invariant(!_killedByInvalidate); - - // recover to last returned record - invariant(!_lastLoc.isNull()); - _it = _records.find(_lastLoc); - invariant(_it != _records.end()); - - if (++_it == _records.end()) - return RecordId(); - } - - const RecordId out = _it->first; - ++_it; - if (_tailable && _it == _records.end()) - _lastLoc = out; - return out; - } - - void InMemoryRecordIterator::invalidate(const RecordId& loc) { - if (_rs.isCapped()) { - // Capped iterators die on invalidation rather than advancing. - if (isEOF()) { - if (_lastLoc == loc) { - _killedByInvalidate = true; - } - } - else if (_it->first == loc) { - _killedByInvalidate = true; - } - - return; - } - - if (_it != _records.end() && _it->first == loc) - ++_it; - } - - void InMemoryRecordIterator::saveState() { - } - - bool InMemoryRecordIterator::restoreState(OperationContext* txn) { - _txn = txn; - return !_killedByInvalidate; - } - - RecordData InMemoryRecordIterator::dataFor(const RecordId& loc) const { - return _rs.dataFor(_txn, loc); - } - - // - // Reverse Iterator - // - - InMemoryRecordReverseIterator::InMemoryRecordReverseIterator( - OperationContext* txn, - const InMemoryRecordStore::Records& records, - const InMemoryRecordStore& rs, - RecordId start) : _txn(txn), - _killedByInvalidate(false), - _records(records), - _rs(rs) { - - if (start.isNull()) { - _it = _records.rbegin(); - } - else { - // The reverse iterator will point to the preceding element, so we - // increment the base iterator to make it point past the found element - InMemoryRecordStore::Records::const_iterator baseIt(++_records.find(start)); - _it = InMemoryRecordStore::Records::const_reverse_iterator(baseIt); - invariant(_it != _records.rend()); - } - } - - bool InMemoryRecordReverseIterator::isEOF() { - return _it == _records.rend(); - } - - RecordId InMemoryRecordReverseIterator::curr() { - if (isEOF()) - return RecordId(); - return _it->first; - } - - RecordId InMemoryRecordReverseIterator::getNext() { - if (isEOF()) - return RecordId(); - - const RecordId out = _it->first; - ++_it; - return out; - } - - void InMemoryRecordReverseIterator::invalidate(const RecordId& loc) { - if (_killedByInvalidate) - return; - - if (_savedLoc == loc) { - if (_rs.isCapped()) { - // Capped iterators die on invalidation rather than advancing. - _killedByInvalidate = true; - return; - } - - restoreState(_txn); - invariant(_it->first == _savedLoc); - ++_it; - saveState(); - } - } - - void InMemoryRecordReverseIterator::saveState() { - if (isEOF()) { - _savedLoc = RecordId(); - } - else { - _savedLoc = _it->first; - } - } - - bool InMemoryRecordReverseIterator::restoreState(OperationContext* txn) { - if (_savedLoc.isNull()) { - _it = _records.rend(); - } - else { - _it = InMemoryRecordStore::Records::const_reverse_iterator(++_records.find(_savedLoc)); - } - return !_killedByInvalidate; - } - - RecordData InMemoryRecordReverseIterator::dataFor(const RecordId& loc) const { - return _rs.dataFor(_txn, loc); - } - } // namespace mongo diff --git a/src/mongo/db/storage/in_memory/in_memory_record_store.h b/src/mongo/db/storage/in_memory/in_memory_record_store.h index 239c4f7bb09..e091d75c4a1 100644 --- a/src/mongo/db/storage/in_memory/in_memory_record_store.h +++ b/src/mongo/db/storage/in_memory/in_memory_record_store.h @@ -39,8 +39,6 @@ namespace mongo { - class InMemoryRecordIterator; - /** * A RecordStore that stores all data in-memory. * @@ -87,13 +85,7 @@ namespace mongo { const char* damageSource, const mutablebson::DamageVector& damages ); - virtual RecordIterator* getIterator( OperationContext* txn, - const RecordId& start, - const CollectionScanParams::Direction& dir) const; - - virtual RecordIterator* getIteratorForRepair( OperationContext* txn ) const; - - virtual std::vector<RecordIterator*> getManyIterators( OperationContext* txn ) const; + std::unique_ptr<RecordCursor> getCursor(OperationContext* txn, bool forward) const final; virtual Status truncate( OperationContext* txn ); @@ -166,6 +158,9 @@ namespace mongo { class RemoveChange; class TruncateChange; + class Cursor; + class ReverseCursor; + StatusWith<RecordId> extractAndCheckLocForOplog(const char* data, int len) const; RecordId allocateLoc(); @@ -191,68 +186,4 @@ namespace mongo { Data* const _data; }; - class InMemoryRecordIterator : public RecordIterator { - public: - InMemoryRecordIterator(OperationContext* txn, - const InMemoryRecordStore::Records& records, - const InMemoryRecordStore& rs, - RecordId start = RecordId(), - bool tailable = false); - - virtual bool isEOF(); - - virtual RecordId curr(); - - virtual RecordId getNext(); - - virtual void invalidate(const RecordId& dl); - - virtual void saveState(); - - virtual bool restoreState(OperationContext* txn); - - virtual RecordData dataFor( const RecordId& loc ) const; - - private: - OperationContext* _txn; // not owned - InMemoryRecordStore::Records::const_iterator _it; - bool _tailable; - RecordId _lastLoc; // only for restarting tailable - bool _killedByInvalidate; - - const InMemoryRecordStore::Records& _records; - const InMemoryRecordStore& _rs; - }; - - class InMemoryRecordReverseIterator : public RecordIterator { - public: - InMemoryRecordReverseIterator(OperationContext* txn, - const InMemoryRecordStore::Records& records, - const InMemoryRecordStore& rs, - RecordId start = RecordId()); - - virtual bool isEOF(); - - virtual RecordId curr(); - - virtual RecordId getNext(); - - virtual void invalidate(const RecordId& dl); - - virtual void saveState(); - - virtual bool restoreState(OperationContext* txn); - - virtual RecordData dataFor( const RecordId& loc ) const; - - private: - OperationContext* _txn; // not owned - InMemoryRecordStore::Records::const_reverse_iterator _it; - bool _killedByInvalidate; - RecordId _savedLoc; // isNull if saved at EOF - - const InMemoryRecordStore::Records& _records; - const InMemoryRecordStore& _rs; - }; - } // namespace mongo diff --git a/src/mongo/db/storage/kv/kv_catalog.cpp b/src/mongo/db/storage/kv/kv_catalog.cpp index 271eb7e12c0..53382bd8b93 100644 --- a/src/mongo/db/storage/kv/kv_catalog.cpp +++ b/src/mongo/db/storage/kv/kv_catalog.cpp @@ -133,17 +133,14 @@ namespace { void KVCatalog::init( OperationContext* opCtx ) { // No locking needed since called single threaded. - scoped_ptr<RecordIterator> it( _rs->getIterator( opCtx ) ); - while ( !it->isEOF() ) { - RecordId loc = it->getNext(); - RecordData data = it->dataFor( loc ); - BSONObj obj( data.data() ); + auto cursor = _rs->getCursor(opCtx); + while (auto record = cursor->next()) { + BSONObj obj = record->data.releaseToBson(); - // No locking needed since can only be called from one thread. // No rollback since this is just loading already committed data. string ns = obj["ns"].String(); string ident = obj["ident"].String(); - _idents[ns] = Entry( ident, loc ); + _idents[ns] = Entry(ident, record->id); } // In the unlikely event that we have used this _rand before generate a new one. @@ -415,11 +412,9 @@ namespace { std::vector<std::string> KVCatalog::getAllIdents( OperationContext* opCtx ) const { std::vector<std::string> v; - scoped_ptr<RecordIterator> it( _rs->getIterator( opCtx ) ); - while ( !it->isEOF() ) { - RecordId loc = it->getNext(); - RecordData data = it->dataFor( loc ); - BSONObj obj( data.data() ); + auto cursor = _rs->getCursor(opCtx); + while (auto record = cursor->next()) { + BSONObj obj = record->data.releaseToBson(); v.push_back( obj["ident"].String() ); BSONElement e = obj["idxIdent"]; diff --git a/src/mongo/db/storage/mmap_v1/catalog/namespace_details_collection_entry.cpp b/src/mongo/db/storage/mmap_v1/catalog/namespace_details_collection_entry.cpp index 1bc37f491c6..1d3fef7b918 100644 --- a/src/mongo/db/storage/mmap_v1/catalog/namespace_details_collection_entry.cpp +++ b/src/mongo/db/storage/mmap_v1/catalog/namespace_details_collection_entry.cpp @@ -364,10 +364,9 @@ namespace { if (!namespaces) return; - boost::scoped_ptr<RecordIterator> iterator(namespaces->getIterator(txn)); - while (!iterator->isEOF()) { - const RecordId loc = iterator->getNext(); - const BSONObj oldEntry = iterator->dataFor(loc).toBson(); + auto cursor = namespaces->getCursor(txn); + while (auto record = cursor->next()) { + BSONObj oldEntry = record->data.releaseToBson(); BSONElement e = oldEntry["name"]; if (e.type() != String) continue; @@ -376,8 +375,10 @@ namespace { continue; const BSONObj newEntry = applyUpdateOperators(oldEntry, update); - StatusWith<RecordId> result = namespaces->updateRecord(txn, loc, newEntry.objdata(), - newEntry.objsize(), false, NULL); + StatusWith<RecordId> result = namespaces->updateRecord(txn, record->id, + newEntry.objdata(), + newEntry.objsize(), + false, NULL); fassert(17486, result.getStatus()); return; } diff --git a/src/mongo/db/storage/mmap_v1/extent_manager.h b/src/mongo/db/storage/mmap_v1/extent_manager.h index 99218214721..54191faa2cf 100644 --- a/src/mongo/db/storage/mmap_v1/extent_manager.h +++ b/src/mongo/db/storage/mmap_v1/extent_manager.h @@ -30,6 +30,7 @@ #pragma once +#include <memory> #include <string> #include <vector> @@ -115,8 +116,10 @@ namespace mongo { /** * The extent manager tracks accesses to DiskLocs. This returns non-NULL if the DiskLoc has * been recently accessed, and therefore has likely been paged into physical memory. + * Returns nullptr if the DiskLoc is Null. + * */ - virtual RecordFetcher* recordNeedsFetch( const DiskLoc& loc ) const = 0; + virtual std::unique_ptr<RecordFetcher> recordNeedsFetch( const DiskLoc& loc ) const = 0; /** * @param loc - has to be for a specific MmapV1RecordHeader (not an Extent) diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h index aac4b052f5d..c44dcf3f473 100644 --- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h +++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h @@ -93,19 +93,10 @@ namespace mongo { invariant(false); } - virtual RecordIterator* getIterator(OperationContext* txn, - const RecordId& start, - const CollectionScanParams::Direction& dir) const { + std::unique_ptr<RecordCursor> getCursor(OperationContext* txn, bool forward) const final { invariant(false); } - virtual RecordIterator* getIteratorForRepair(OperationContext* txn) const { - invariant(false); - } - - virtual std::vector<RecordIterator*> getManyIterators(OperationContext* txn) const { - invariant(false); - } virtual Status truncate(OperationContext* txn) { invariant(false); } diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp b/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp index 06d89b5da4c..5045d6a23da 100644 --- a/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp +++ b/src/mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.cpp @@ -270,11 +270,9 @@ namespace { invariant( details ); RecordStoreV1Base* systemIndexRecordStore = _getIndexRecordStore(); - scoped_ptr<RecordIterator> it( systemIndexRecordStore->getIterator(txn) ); - - while ( !it->isEOF() ) { - RecordId loc = it->getNext(); - BSONObj oldIndexSpec = it->dataFor( loc ).toBson(); + auto cursor = systemIndexRecordStore->getCursor(txn); + while (auto record = cursor->next()) { + BSONObj oldIndexSpec = record->data.releaseToBson(); if ( fromNS != oldIndexSpec["ns"].valuestrsafe() ) continue; @@ -326,7 +324,7 @@ namespace { return s; } - systemIndexRecordStore->deleteRecord( txn, loc ); + systemIndexRecordStore->deleteRecord( txn, record->id ); } return Status::OK(); @@ -381,12 +379,11 @@ namespace { BSONObj oldSpec; { RecordStoreV1Base* rs = _getNamespaceRecordStore(); - scoped_ptr<RecordIterator> it( rs->getIterator(txn) ); - while ( !it->isEOF() ) { - RecordId loc = it->getNext(); - BSONObj entry = it->dataFor( loc ).toBson(); + auto cursor = rs->getCursor(txn); + while (auto record = cursor->next()) { + BSONObj entry = record->data.releaseToBson(); if ( fromNS == entry["name"].String() ) { - oldSpecLocation = loc; + oldSpecLocation = record->id; oldSpec = entry.getOwned(); break; } @@ -864,13 +861,12 @@ namespace { RecordStoreV1Base* rs = _getNamespaceRecordStore(); invariant( rs ); - scoped_ptr<RecordIterator> it( rs->getIterator(txn) ); - while ( !it->isEOF() ) { - RecordId loc = it->getNext(); - BSONObj entry = it->dataFor( loc ).toBson(); + auto cursor = rs->getCursor(txn); + while (auto record = cursor->next()) { + BSONObj entry = record->data.releaseToBson(); BSONElement name = entry["name"]; if ( name.type() == String && name.String() == ns ) { - rs->deleteRecord( txn, loc ); + rs->deleteRecord( txn, record->id ); break; } } @@ -885,10 +881,9 @@ namespace { RecordStoreV1Base* rs = _getNamespaceRecordStore(); invariant( rs ); - scoped_ptr<RecordIterator> it( rs->getIterator(txn) ); - while ( !it->isEOF() ) { - RecordId loc = it->getNext(); - BSONObj entry = it->dataFor( loc ).toBson(); + auto cursor = rs->getCursor(txn); + while (auto record = cursor->next()) { + BSONObj entry = record->data.releaseToBson(); BSONElement name = entry["name"]; if ( name.type() == String && name.String() == ns ) { CollectionOptions options; diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.cpp b/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.cpp index 9841e427259..ebd298917c5 100644 --- a/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.cpp +++ b/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.cpp @@ -48,6 +48,7 @@ #include "mongo/db/storage/mmap_v1/mmap_v1_options.h" #include "mongo/db/storage/record_fetcher.h" #include "mongo/db/operation_context.h" +#include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/file.h" #include "mongo/util/log.h" @@ -294,7 +295,8 @@ namespace mongo { return record; } - RecordFetcher* MmapV1ExtentManager::recordNeedsFetch( const DiskLoc& loc ) const { + std::unique_ptr<RecordFetcher> MmapV1ExtentManager::recordNeedsFetch(const DiskLoc& loc) const { + if (loc.isNull()) return {}; MmapV1RecordHeader* record = _recordForV1( loc ); // For testing: if failpoint is enabled we randomly request fetches without @@ -302,15 +304,15 @@ namespace mongo { if ( MONGO_FAIL_POINT( recordNeedsFetchFail ) ) { needsFetchFailCounter.increment(); if ( ( needsFetchFailCounter.get() % kNeedsFetchFailFreq ) == 0 ) { - return new MmapV1RecordFetcher( record ); + return stdx::make_unique<MmapV1RecordFetcher>( record ); } } if ( !_recordAccessTracker->checkAccessedAndMark( record ) ) { - return new MmapV1RecordFetcher( record ); + return stdx::make_unique<MmapV1RecordFetcher>( record ); } - return NULL; + return {}; } DiskLoc MmapV1ExtentManager::extentLocForV1( const DiskLoc& loc ) const { diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.h b/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.h index 4b19b4d6b67..c2343205dc9 100644 --- a/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.h +++ b/src/mongo/db/storage/mmap_v1/mmap_v1_extent_manager.h @@ -124,7 +124,7 @@ namespace mongo { */ MmapV1RecordHeader* recordForV1( const DiskLoc& loc ) const; - RecordFetcher* recordNeedsFetch( const DiskLoc& loc ) const; + std::unique_ptr<RecordFetcher> recordNeedsFetch( const DiskLoc& loc ) const final; /** * @param loc - has to be for a specific MmapV1RecordHeader (not an Extent) diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp index a52326def89..4368190a532 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp @@ -41,6 +41,7 @@ #include "mongo/db/storage/mmap_v1/extent_manager.h" #include "mongo/db/storage/mmap_v1/record.h" #include "mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h" +#include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/progress_meter.h" #include "mongo/util/timer.h" @@ -283,12 +284,6 @@ namespace mongo { return result; } - RecordFetcher* RecordStoreV1Base::recordNeedsFetch( OperationContext* txn, - const RecordId& loc ) const { - return _extentManager->recordNeedsFetch( DiskLoc::fromRecordId(loc) ); - } - - StatusWith<RecordId> RecordStoreV1Base::insertRecord( OperationContext* txn, const DocWriter* doc, bool enforceQuota ) { @@ -503,8 +498,9 @@ namespace mongo { } - RecordIterator* RecordStoreV1Base::getIteratorForRepair(OperationContext* txn) const { - return new RecordStoreV1RepairIterator(txn, this); + std::unique_ptr<RecordCursor> RecordStoreV1Base::getCursorForRepair( + OperationContext* txn) const { + return stdx::make_unique<RecordStoreV1RepairCursor>(txn, this); } void RecordStoreV1Base::_addRecordToRecListInExtent(OperationContext* txn, @@ -719,22 +715,22 @@ namespace mongo { long long nlen = 0; long long bsonLen = 0; int outOfOrder = 0; - DiskLoc cl_last; + DiskLoc dl_last; - scoped_ptr<RecordIterator> iterator( getIterator(txn) ); - DiskLoc cl; - while ( !( cl = DiskLoc::fromRecordId(iterator->getNext()) ).isNull() ) { + auto cursor = getCursor(txn); + while (auto record = cursor->next()) { + const auto dl = DiskLoc::fromRecordId(record->id); n++; if ( n < 1000000 ) - recs.insert(cl); + recs.insert(dl); if ( isCapped() ) { - if ( cl < cl_last ) + if ( dl < dl_last ) outOfOrder++; - cl_last = cl; + dl_last = dl; } - MmapV1RecordHeader *r = recordFor(cl); + MmapV1RecordHeader *r = recordFor(dl); len += r->lengthWithHeaders(); nlen += r->netLength(); @@ -922,23 +918,36 @@ namespace mongo { return Status::OK(); } - RecordId RecordStoreV1Base::IntraExtentIterator::getNext() { + boost::optional<Record> RecordStoreV1Base::IntraExtentIterator::next() { + if (_curr.isNull()) return {}; + auto out = _curr.toRecordId(); + advance(); + return {{out, _rs->dataFor(_txn, out)}}; + } + + boost::optional<Record> RecordStoreV1Base::IntraExtentIterator::seekExact(const RecordId& id) { + invariant(!"seekExact not supported"); + } + + void RecordStoreV1Base::IntraExtentIterator::advance() { if (_curr.isNull()) - return RecordId(); + return; - const DiskLoc out = _curr; // we always return where we were, not where we will be. const MmapV1RecordHeader* rec = recordFor(_curr); const int nextOfs = _forward ? rec->nextOfs() : rec->prevOfs(); _curr = (nextOfs == DiskLoc::NullOfs ? DiskLoc() : DiskLoc(_curr.a(), nextOfs)); - return out.toRecordId();; } void RecordStoreV1Base::IntraExtentIterator::invalidate(const RecordId& rid) { if (rid == _curr.toRecordId()) { - getNext(); + advance(); } } + std::unique_ptr<RecordFetcher> RecordStoreV1Base::IntraExtentIterator::fetcherForNext() const { + return _rs->_extentManager->recordNeedsFetch(_curr); + } + int RecordStoreV1Base::quantizeAllocationSpace(int allocSize) { invariant(allocSize <= MaxAllowedAllocation); for ( int i = 0; i < Buckets - 2; i++ ) { // last two bucketSizes are invalid diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h index a7264dd9d29..a9b1ed78aa5 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h @@ -189,9 +189,6 @@ namespace mongo { void deleteRecord( OperationContext* txn, const RecordId& dl ); - virtual RecordFetcher* recordNeedsFetch( OperationContext* txn, - const RecordId& loc ) const; - StatusWith<RecordId> insertRecord( OperationContext* txn, const char* data, int len, @@ -216,7 +213,7 @@ namespace mongo { const char* damageSource, const mutablebson::DamageVector& damages ); - virtual RecordIterator* getIteratorForRepair( OperationContext* txn ) const; + virtual std::unique_ptr<RecordCursor> getCursorForRepair( OperationContext* txn ) const; void increaseStorageSize( OperationContext* txn, int size, bool enforceQuota ); @@ -314,7 +311,7 @@ namespace mongo { ExtentManager* _extentManager; bool _isSystemIndexes; - friend class RecordStoreV1RepairIterator; + friend class RecordStoreV1RepairCursor; }; /** @@ -322,7 +319,7 @@ namespace mongo { * * EOF at end of extent, even if there are more extents. */ - class RecordStoreV1Base::IntraExtentIterator : public RecordIterator { + class RecordStoreV1Base::IntraExtentIterator final : public RecordCursor { public: IntraExtentIterator(OperationContext* txn, DiskLoc start, @@ -330,22 +327,20 @@ namespace mongo { bool forward = true) : _txn(txn), _curr(start), _rs(rs), _forward(forward) {} - virtual bool isEOF() { return _curr.isNull(); } - - virtual RecordId curr() { return _curr.toRecordId(); } - - virtual RecordId getNext( ); - - virtual void invalidate(const RecordId& dl); + boost::optional<Record> next() final; + boost::optional<Record> seekExact(const RecordId& id) final; + void invalidate(const RecordId& dl) final; + void savePositioned() final {} + bool restore(OperationContext* txn) final { return true; } + std::unique_ptr<RecordFetcher> fetcherForNext() const final; - virtual void saveState() {} - - virtual bool restoreState(OperationContext* txn) { return true; } + private: + virtual const MmapV1RecordHeader* recordFor( const DiskLoc& loc ) const { + return _rs->recordFor(loc); + } - virtual RecordData dataFor( const RecordId& loc ) const { return _rs->dataFor(_txn, loc); } + void advance(); - private: - virtual const MmapV1RecordHeader* recordFor( const DiskLoc& loc ) const { return _rs->recordFor(loc); } OperationContext* _txn; DiskLoc _curr; const RecordStoreV1Base* _rs; diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp index 45143deb2ec..a41dd66ab1e 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp @@ -38,6 +38,7 @@ #include "mongo/db/storage/mmap_v1/mmap.h" #include "mongo/db/storage/mmap_v1/record.h" #include "mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h" +#include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -599,14 +600,15 @@ namespace mongo { } } - RecordIterator* CappedRecordStoreV1::getIterator( OperationContext* txn, - const RecordId& start, - const CollectionScanParams::Direction& dir) const { - return new CappedRecordStoreV1Iterator( txn, this, start, false, dir ); + std::unique_ptr<RecordCursor> CappedRecordStoreV1::getCursor(OperationContext* txn, + bool forward) const { + + return stdx::make_unique<CappedRecordStoreV1Iterator>(txn, this, forward); } - vector<RecordIterator*> CappedRecordStoreV1::getManyIterators( OperationContext* txn ) const { - OwnedPointerVector<RecordIterator> iterators; + vector<std::unique_ptr<RecordCursor>> CappedRecordStoreV1::getManyCursors( + OperationContext* txn) const { + vector<std::unique_ptr<RecordCursor>> cursors; if (!_details->capLooped()) { // if we haven't looped yet, just spit out all extents (same as non-capped impl) @@ -616,9 +618,8 @@ namespace mongo { if (ext->firstRecord.isNull()) continue; - iterators.push_back(new RecordStoreV1Base::IntraExtentIterator(txn, - ext->firstRecord, - this)); + cursors.push_back(stdx::make_unique<RecordStoreV1Base::IntraExtentIterator>( + txn, ext->firstRecord, this)); } } else { @@ -634,9 +635,8 @@ namespace mongo { const Extent* ext = _getExtent(txn, extLoc); if (ext->firstRecord != details()->capFirstNewRecord()) { // this means there is old data in capExtent - iterators.push_back(new RecordStoreV1Base::IntraExtentIterator(txn, - ext->firstRecord, - this)); + cursors.push_back(stdx::make_unique<RecordStoreV1Base::IntraExtentIterator>( + txn, ext->firstRecord, this)); } extLoc = ext->xnext.isNull() ? details()->firstExtent(txn) : ext->xnext; @@ -645,21 +645,18 @@ namespace mongo { // Next handle all the other extents while (extLoc != capExtent) { const Extent* ext = _getExtent(txn, extLoc); - iterators.push_back(new RecordStoreV1Base::IntraExtentIterator(txn, - ext->firstRecord, - this)); + cursors.push_back(stdx::make_unique<RecordStoreV1Base::IntraExtentIterator>( + txn, ext->firstRecord, this)); extLoc = ext->xnext.isNull() ? details()->firstExtent(txn) : ext->xnext; } // Finally handle the "new" data in the capExtent - iterators.push_back( - new RecordStoreV1Base::IntraExtentIterator(txn, - details()->capFirstNewRecord(), - this)); + cursors.push_back(stdx::make_unique<RecordStoreV1Base::IntraExtentIterator>( + txn, details()->capFirstNewRecord(), this)); } - return iterators.release(); + return cursors; } void CappedRecordStoreV1::_maybeComplain( OperationContext* txn, int len ) const { diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped.h b/src/mongo/db/storage/mmap_v1/record_store_v1_capped.h index c294c95e934..186de786f37 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped.h @@ -38,7 +38,7 @@ namespace mongo { - class CappedRecordStoreV1 : public RecordStoreV1Base { + class CappedRecordStoreV1 final : public RecordStoreV1Base { public: CappedRecordStoreV1( OperationContext* txn, CappedDocumentDeleteCallback* collection, @@ -47,11 +47,11 @@ namespace mongo { ExtentManager* em, bool isSystemIndexes ); - virtual ~CappedRecordStoreV1(); + ~CappedRecordStoreV1() final; - const char* name() const { return "CappedRecordStoreV1"; } + const char* name() const final { return "CappedRecordStoreV1"; } - virtual Status truncate(OperationContext* txn); + Status truncate(OperationContext* txn) final; /** * Truncate documents newer than the document at 'end' from the capped @@ -60,13 +60,12 @@ namespace mongo { * @param inclusive - Truncate 'end' as well iff true * XXX: this will go away soon, just needed to move for now */ - virtual void temp_cappedTruncateAfter(OperationContext* txn, RecordId end, bool inclusive); + void temp_cappedTruncateAfter(OperationContext* txn, RecordId end, bool inclusive) final; - virtual RecordIterator* getIterator( OperationContext* txn, - const RecordId& start, - const CollectionScanParams::Direction& dir) const; + std::unique_ptr<RecordCursor> getCursor(OperationContext* txn, bool forward) const final; - virtual std::vector<RecordIterator*> getManyIterators( OperationContext* txn ) const; + std::vector<std::unique_ptr<RecordCursor>> getManyCursors( + OperationContext* txn) const final; // Start from firstExtent by default. DiskLoc firstRecord( OperationContext* txn, @@ -77,18 +76,18 @@ namespace mongo { protected: - virtual bool isCapped() const { return true; } - virtual bool shouldPadInserts() const { return false; } + bool isCapped() const final { return true; } + bool shouldPadInserts() const final { return false; } - virtual void setCappedDeleteCallback( CappedDocumentDeleteCallback* cb ) { + void setCappedDeleteCallback( CappedDocumentDeleteCallback* cb ) final { _deleteCallback = cb; } - virtual StatusWith<DiskLoc> allocRecord( OperationContext* txn, + StatusWith<DiskLoc> allocRecord( OperationContext* txn, int lengthWithHeaders, - bool enforceQuota ); + bool enforceQuota ) final; - virtual void addDeletedRec(OperationContext* txn, const DiskLoc& dloc); + void addDeletedRec(OperationContext* txn, const DiskLoc& dloc) final; private: // -- start copy from cap.cpp -- diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp index e24249f40cc..ea77d224488 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.cpp @@ -39,112 +39,83 @@ namespace mongo { // // Capped collection traversal // - CappedRecordStoreV1Iterator::CappedRecordStoreV1Iterator( OperationContext* txn, - const CappedRecordStoreV1* collection, - const RecordId& start, bool tailable, - const CollectionScanParams::Direction& dir) - : _txn(txn), _recordStore(collection), _curr(DiskLoc::fromRecordId(start)) - , _tailable(tailable), _direction(dir), _killedByInvalidate(false) { - - if (_curr.isNull()) { - - const RecordStoreV1MetaData* nsd = _recordStore->details(); - - // If a start position isn't specified, we fill one out from the start of the - // collection. - if (CollectionScanParams::FORWARD == _direction) { - // Going forwards. - if (!nsd->capLooped()) { - // If our capped collection doesn't loop around, the first record is easy. - _curr = collection->firstRecord(_txn); - } - else { - // Our capped collection has "looped' around. - // Copied verbatim from ForwardCappedCursor::init. - // TODO ELABORATE - _curr = _getExtent( nsd->capExtent() )->firstRecord; - if (!_curr.isNull() && _curr == nsd->capFirstNewRecord()) { - _curr = _getExtent( nsd->capExtent() )->lastRecord; - _curr = nextLoop(_curr); - } - } + CappedRecordStoreV1Iterator::CappedRecordStoreV1Iterator(OperationContext* txn, + const CappedRecordStoreV1* collection, + bool forward) + : _txn(txn), _recordStore(collection), _forward(forward) { + + const RecordStoreV1MetaData* nsd = _recordStore->details(); + + // If a start position isn't specified, we fill one out from the start of the + // collection. + if (_forward) { + // Going forwards. + if (!nsd->capLooped()) { + // If our capped collection doesn't loop around, the first record is easy. + _curr = collection->firstRecord(_txn); } else { - // Going backwards - if (!nsd->capLooped()) { - // Start at the end. - _curr = collection->lastRecord(_txn); - } - else { + // Our capped collection has "looped' around. + // Copied verbatim from ForwardCappedCursor::init. + // TODO ELABORATE + _curr = _getExtent( nsd->capExtent() )->firstRecord; + if (!_curr.isNull() && _curr == nsd->capFirstNewRecord()) { _curr = _getExtent( nsd->capExtent() )->lastRecord; + _curr = nextLoop(_curr); } } } - } - - bool CappedRecordStoreV1Iterator::isEOF() { return _curr.isNull(); } - - RecordId CappedRecordStoreV1Iterator::curr() { return _curr.toRecordId(); } - - RecordId CappedRecordStoreV1Iterator::getNext() { - DiskLoc ret = _curr; - - // Move to the next thing. - if (!isEOF()) { - _prev = _curr; - _curr = getNextCapped(_curr); - } - else if (_tailable && !_prev.isNull()) { - // If we're tailable, there COULD have been something inserted even though we were - // previously EOF. Look at the next thing from 'prev' and see. - DiskLoc newCurr = getNextCapped(_prev); - - if (!newCurr.isNull()) { - // There's something new to return. _curr always points to the next thing to - // return. Update it, and move _prev to the thing we just returned. - _prev = ret = newCurr; - _curr = getNextCapped(_prev); + else { + // Going backwards + if (!nsd->capLooped()) { + // Start at the end. + _curr = collection->lastRecord(_txn); + } + else { + _curr = _getExtent( nsd->capExtent() )->lastRecord; } } + } + + boost::optional<Record> CappedRecordStoreV1Iterator::next() { + if (isEOF()) return {}; + auto toReturn = _curr.toRecordId(); + _curr = getNextCapped(_curr); + return {{toReturn, _recordStore->RecordStore::dataFor(_txn, toReturn)}}; + } - return ret.toRecordId(); + boost::optional<Record> CappedRecordStoreV1Iterator::seekExact(const RecordId& id) { + _curr = getNextCapped(DiskLoc::fromRecordId(id)); + return {{id, _recordStore->RecordStore::dataFor(_txn, id)}}; } void CappedRecordStoreV1Iterator::invalidate(const RecordId& id) { const DiskLoc dl = DiskLoc::fromRecordId(id); - if ((_tailable && _curr.isNull() && dl == _prev) || (dl == _curr)) { - // In the _tailable case, we're about to kill the DiskLoc that we're tailing. Nothing - // that we can possibly do to survive that. - // - // In the _curr case, we *could* move to the next thing, since there is actually a next + if (dl == _curr) { + // We *could* move to the next thing, since there is actually a next // thing, but according to clientcursor.cpp: // "note we cannot advance here. if this condition occurs, writes to the oplog - // have "caught" the reader. skipping ahead, the reader would miss postentially + // have "caught" the reader. skipping ahead, the reader would miss potentially // important data." - _curr = _prev = DiskLoc(); + _curr = DiskLoc(); _killedByInvalidate = true; } } - void CappedRecordStoreV1Iterator::saveState() { + void CappedRecordStoreV1Iterator::savePositioned() { + _txn = nullptr; } - bool CappedRecordStoreV1Iterator::restoreState(OperationContext* txn) { + bool CappedRecordStoreV1Iterator::restore(OperationContext* txn) { _txn = txn; - // If invalidate invalidated the DiskLoc we relied on, give up now. - if (_killedByInvalidate) { - _recordStore = NULL; - return false; - } - - return true; + return !_killedByInvalidate; } DiskLoc CappedRecordStoreV1Iterator::getNextCapped(const DiskLoc& dl) { invariant(!dl.isNull()); const RecordStoreV1MetaData* details = _recordStore->details(); - if (CollectionScanParams::FORWARD == _direction) { + if (_forward) { // If it's not looped, it's easy. if (!_recordStore->details()->capLooped()) { return _getNextRecord( dl ); @@ -220,9 +191,6 @@ namespace mongo { return _recordStore->lastRecord(_txn); } - RecordData CappedRecordStoreV1Iterator::dataFor( const RecordId& loc ) const { - return _recordStore->dataFor( _txn, loc ); - } Extent* CappedRecordStoreV1Iterator::_getExtent( const DiskLoc& loc ) { return _recordStore->_extentManager->getExtent( loc ); @@ -236,4 +204,13 @@ namespace mongo { return _recordStore->getPrevRecord( _txn, loc ); } + std::unique_ptr<RecordFetcher> CappedRecordStoreV1Iterator::fetcherForNext() const { + return _recordStore->_extentManager->recordNeedsFetch(_curr); + } + + std::unique_ptr<RecordFetcher> CappedRecordStoreV1Iterator::fetcherForId( + const RecordId& id) const { + return _recordStore->_extentManager->recordNeedsFetch(DiskLoc::fromRecordId(id)); + } + } // namespace mongo diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h index 99f83b20e7a..de2b6fda5e3 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_iterator.h @@ -40,32 +40,25 @@ namespace mongo { /** * This class iterates over a capped collection identified by 'ns'. * The collection must exist when the constructor is called. - * - * If start is not DiskLoc(), the iteration begins at that DiskLoc. - * - * If tailable is true, getNext() can be called after isEOF. It will use the last valid - * returned DiskLoc and try to find the next record from that. */ - class CappedRecordStoreV1Iterator : public RecordIterator { + class CappedRecordStoreV1Iterator final : public RecordCursor { public: CappedRecordStoreV1Iterator( OperationContext* txn, const CappedRecordStoreV1* collection, - const RecordId& start, - bool tailable, - const CollectionScanParams::Direction& dir ); - virtual ~CappedRecordStoreV1Iterator() { } + bool forward ); - // If this is a tailable cursor, isEOF could change its mind after a call to getNext(). - virtual bool isEOF(); - virtual RecordId getNext(); - virtual RecordId curr(); + boost::optional<Record> next() final; + boost::optional<Record> seekExact(const RecordId& id) final; + void savePositioned() final; + bool restore(OperationContext* txn) final; + void invalidate(const RecordId& dl) final; + std::unique_ptr<RecordFetcher> fetcherForNext() const final; + std::unique_ptr<RecordFetcher> fetcherForId(const RecordId& id) const final; - virtual void invalidate(const RecordId& dl); - virtual void saveState(); - virtual bool restoreState(OperationContext* txn); - - virtual RecordData dataFor( const RecordId& loc ) const; private: + void advance(); + bool isEOF() { return _curr.isNull(); } + /** * Internal collection navigation helper methods. */ @@ -82,20 +75,16 @@ namespace mongo { OperationContext* _txn; // The collection we're iterating over. - const CappedRecordStoreV1* _recordStore; + const CappedRecordStoreV1* const _recordStore; // The result returned on the next call to getNext(). DiskLoc _curr; - // If we're tailable, we try to progress from the last valid result when we hit the end. - DiskLoc _prev; - bool _tailable; - - CollectionScanParams::Direction _direction; + const bool _forward; // If invalidate kills the DiskLoc we need to move forward, we kill the iterator. See the // comment in the body of invalidate(...). - bool _killedByInvalidate; + bool _killedByInvalidate = false; }; } // namespace mongo diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp index d3c834b9ce0..0c369587f9b 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp @@ -680,13 +680,10 @@ namespace { void walkAndCount (int expectedCount) { // Walk the collection going forward. { - CappedRecordStoreV1Iterator it(&txn, &rs, RecordId(), false, - CollectionScanParams::FORWARD); - + CappedRecordStoreV1Iterator cursor(&txn, &rs, /*forward=*/true); int resultCount = 0; - while (!it.isEOF()) { + while (auto record = cursor.next()) { ++resultCount; - it.getNext(); } ASSERT_EQUALS(resultCount, expectedCount); @@ -694,13 +691,10 @@ namespace { // Walk the collection going backwards. { - CappedRecordStoreV1Iterator it(&txn, &rs, RecordId(), false, - CollectionScanParams::BACKWARD); - + CappedRecordStoreV1Iterator cursor(&txn, &rs, /*forward=*/false); int resultCount = expectedCount; - while (!it.isEOF()) { + while (auto record = cursor.next()) { --resultCount; - it.getNext(); } ASSERT_EQUALS(resultCount, 0); diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.cpp index 7f8151dce81..a4cb9977fe3 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.cpp @@ -40,31 +40,34 @@ namespace mongo { using std::endl; - RecordStoreV1RepairIterator::RecordStoreV1RepairIterator(OperationContext* txn, + RecordStoreV1RepairCursor::RecordStoreV1RepairCursor(OperationContext* txn, const RecordStoreV1Base* recordStore) : _txn(txn), _recordStore(recordStore), _stage(FORWARD_SCAN) { // Position the iterator at the first record // - getNext(); + advance(); } - bool RecordStoreV1RepairIterator::isEOF() { - return _currRecord.isNull(); + boost::optional<Record> RecordStoreV1RepairCursor::next() { + if (_currRecord.isNull()) return {}; + auto out = _currRecord.toRecordId(); + advance(); + return {{out, _recordStore->dataFor(_txn, out)}}; } - RecordId RecordStoreV1RepairIterator::curr() { return _currRecord.toRecordId(); } - - RecordId RecordStoreV1RepairIterator::getNext() { - const DiskLoc retVal = _currRecord; + boost::optional<Record> RecordStoreV1RepairCursor::seekExact(const RecordId& id) { + invariant(!"seekExact not supported"); + } + void RecordStoreV1RepairCursor::advance() { const ExtentManager* em = _recordStore->_extentManager; while (true) { if (_currRecord.isNull()) { if (!_advanceToNextValidExtent()) { - return retVal.toRecordId(); + return; } _seenInCurrentExtent.clear(); @@ -108,11 +111,11 @@ namespace mongo { continue; } - return retVal.toRecordId(); + return; } } - bool RecordStoreV1RepairIterator::_advanceToNextValidExtent() { + bool RecordStoreV1RepairCursor::_advanceToNextValidExtent() { const ExtentManager* em = _recordStore->_extentManager; while (true) { @@ -186,7 +189,7 @@ namespace mongo { return true; } - void RecordStoreV1RepairIterator::invalidate(const RecordId& id) { + void RecordStoreV1RepairCursor::invalidate(const RecordId& id) { // If we see this record again it probably means it was reinserted rather than an infinite // loop. If we do loop, we should quickly hit another seen record that hasn't been // invalidated. @@ -196,22 +199,18 @@ namespace mongo { if (_currRecord == dl) { // The DiskLoc being invalidated is also the one pointed at by this iterator. We // advance the iterator so it's not pointing at invalid data. - getNext(); + advance(); if (_currRecord == dl) { // Even after advancing the iterator, we're still pointing at the DiskLoc being // invalidated. This is expected when 'dl' is the last DiskLoc in the FORWARD scan, // and the initial call to getNext() moves the iterator to the first loc in the // BACKWARDS scan. - getNext(); + advance(); } invariant(_currRecord != dl); } } - RecordData RecordStoreV1RepairIterator::dataFor(const RecordId& loc) const { - return _recordStore->dataFor( _txn, loc ); - } - } // namespace mongo diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h b/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h index c57c5886ccc..6b93ad5941a 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_repair_iterator.h @@ -40,26 +40,26 @@ namespace mongo { * extent) and once backwards in an attempt to salvage potentially corrupted or unreachable * records. It is used by the mongodump --repair option. */ - class RecordStoreV1RepairIterator : public RecordIterator { + class RecordStoreV1RepairCursor final : public RecordCursor { public: - RecordStoreV1RepairIterator(OperationContext* txn, + RecordStoreV1RepairCursor(OperationContext* txn, const RecordStoreV1Base* recordStore); - virtual ~RecordStoreV1RepairIterator() { } - virtual bool isEOF(); - virtual RecordId getNext(); - virtual RecordId curr(); - - virtual void invalidate(const RecordId& dl); - virtual void saveState() { } - virtual bool restoreState(OperationContext* txn) { + boost::optional<Record> next() final; + boost::optional<Record> seekExact(const RecordId& id) final; + void invalidate(const RecordId& dl); + void savePositioned() final { _txn = nullptr; } + bool restore(OperationContext* txn) final { _txn = txn; return true; } - virtual RecordData dataFor( const RecordId& loc ) const; + // Explicitly not supporting fetcherForNext(). The expected use case for this class is a + // special offline operation where there are no concurrent operations, so it would be better + // to take the pagefault inline with the operation. private: + void advance(); /** * Based on the direction of scan, finds the next valid (un-corrupted) extent in the chain diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp index d6ee3fe4ddd..029883254bd 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp @@ -44,6 +44,7 @@ #include "mongo/db/storage/mmap_v1/record.h" #include "mongo/db/operation_context.h" #include "mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h" +#include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/progress_meter.h" #include "mongo/util/mongoutils/str.h" @@ -238,24 +239,26 @@ namespace mongo { _details->setDeletedListEntry(txn, b, dloc); } - RecordIterator* SimpleRecordStoreV1::getIterator( OperationContext* txn, - const RecordId& start, - const CollectionScanParams::Direction& dir) const { - return new SimpleRecordStoreV1Iterator( txn, this, start, dir ); + std::unique_ptr<RecordCursor> SimpleRecordStoreV1::getCursor(OperationContext* txn, + bool forward) const { + return stdx::make_unique<SimpleRecordStoreV1Iterator>( txn, this, forward ); } - vector<RecordIterator*> SimpleRecordStoreV1::getManyIterators( OperationContext* txn ) const { - OwnedPointerVector<RecordIterator> iterators; + vector<std::unique_ptr<RecordCursor>> SimpleRecordStoreV1::getManyCursors( + OperationContext* txn) const { + vector<std::unique_ptr<RecordCursor>> cursors; const Extent* ext; for (DiskLoc extLoc = details()->firstExtent(txn); !extLoc.isNull(); extLoc = ext->xnext) { ext = _getExtent(txn, extLoc); if (ext->firstRecord.isNull()) continue; - iterators.push_back( - new RecordStoreV1Base::IntraExtentIterator(txn, ext->firstRecord, this)); + cursors.push_back( + stdx::make_unique<RecordStoreV1Base::IntraExtentIterator>(txn, + ext->firstRecord, + this)); } - return iterators.release(); + return cursors; } class CompactDocWriter : public DocWriter { diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple.h b/src/mongo/db/storage/mmap_v1/record_store_v1_simple.h index b5ea90c8aac..a108305492a 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple.h @@ -36,7 +36,7 @@ namespace mongo { - class SimpleRecordStoreV1Iterator; + class SimpleRecordStoreV1Cursor; // used by index and original collections class SimpleRecordStoreV1 : public RecordStoreV1Base { @@ -51,10 +51,10 @@ namespace mongo { const char* name() const { return "SimpleRecordStoreV1"; } - virtual RecordIterator* getIterator( OperationContext* txn, const RecordId& start, - const CollectionScanParams::Direction& dir) const; + std::unique_ptr<RecordCursor> getCursor(OperationContext* txn, bool forward) const final; - virtual std::vector<RecordIterator*> getManyIterators(OperationContext* txn) const; + std::vector<std::unique_ptr<RecordCursor>> getManyCursors( + OperationContext* txn) const final; virtual Status truncate(OperationContext* txn); diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp index c6a52b03f20..ec1e51abe02 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.cpp @@ -41,94 +41,94 @@ namespace mongo { SimpleRecordStoreV1Iterator::SimpleRecordStoreV1Iterator(OperationContext* txn, const SimpleRecordStoreV1* collection, - const RecordId& start, - const CollectionScanParams::Direction& dir) + bool forward) : _txn(txn) - , _curr(DiskLoc::fromRecordId(start)) , _recordStore(collection) - , _direction(dir) { + , _forward(forward) { - if (_curr.isNull()) { - - const ExtentManager* em = _recordStore->_extentManager; + // Eagerly seek to first Record on creation since it is cheap. + const ExtentManager* em = _recordStore->_extentManager; + if ( _recordStore->details()->firstExtent(txn).isNull() ) { + // nothing in the collection + verify( _recordStore->details()->lastExtent(txn).isNull() ); + } + else if (_forward) { + // Find a non-empty extent and start with the first record in it. + Extent* e = em->getExtent( _recordStore->details()->firstExtent(txn) ); - if ( _recordStore->details()->firstExtent(txn).isNull() ) { - // nothing in the collection - verify( _recordStore->details()->lastExtent(txn).isNull() ); + while (e->firstRecord.isNull() && !e->xnext.isNull()) { + e = em->getExtent( e->xnext ); } - else if (CollectionScanParams::FORWARD == _direction) { - - // Find a non-empty extent and start with the first record in it. - Extent* e = em->getExtent( _recordStore->details()->firstExtent(txn) ); - while (e->firstRecord.isNull() && !e->xnext.isNull()) { - e = em->getExtent( e->xnext ); - } - - // _curr may be set to DiskLoc() here if e->lastRecord isNull but there is no - // valid e->xnext - _curr = e->firstRecord; - } - else { - // Walk backwards, skipping empty extents, and use the last record in the first - // non-empty extent we see. - Extent* e = em->getExtent( _recordStore->details()->lastExtent(txn) ); - - // TODO ELABORATE - // Does one of e->lastRecord.isNull(), e.firstRecord.isNull() imply the other? - while (e->lastRecord.isNull() && !e->xprev.isNull()) { - e = em->getExtent( e->xprev ); - } - - // _curr may be set to DiskLoc() here if e->lastRecord isNull but there is no - // valid e->xprev - _curr = e->lastRecord; + // _curr may be set to DiskLoc() here if e->lastRecord isNull but there is no + // valid e->xnext + _curr = e->firstRecord; + } + else { + // Walk backwards, skipping empty extents, and use the last record in the first + // non-empty extent we see. + Extent* e = em->getExtent( _recordStore->details()->lastExtent(txn) ); + + // TODO ELABORATE + // Does one of e->lastRecord.isNull(), e.firstRecord.isNull() imply the other? + while (e->lastRecord.isNull() && !e->xprev.isNull()) { + e = em->getExtent( e->xprev ); } + + // _curr may be set to DiskLoc() here if e->lastRecord isNull but there is no + // valid e->xprev + _curr = e->lastRecord; } } - bool SimpleRecordStoreV1Iterator::isEOF() { - return _curr.isNull(); + boost::optional<Record> SimpleRecordStoreV1Iterator::next() { + if (isEOF()) return {}; + auto toReturn = _curr.toRecordId(); + advance(); + return {{toReturn, _recordStore->RecordStore::dataFor(_txn, toReturn)}}; } - RecordId SimpleRecordStoreV1Iterator::curr() { return _curr.toRecordId(); } - - RecordId SimpleRecordStoreV1Iterator::getNext() { - DiskLoc ret = _curr; + boost::optional<Record> SimpleRecordStoreV1Iterator::seekExact(const RecordId& id) { + _curr = DiskLoc::fromRecordId(id); + advance(); + return {{id, _recordStore->RecordStore::dataFor(_txn, id)}}; + } + void SimpleRecordStoreV1Iterator::advance() { // Move to the next thing. if (!isEOF()) { - if (CollectionScanParams::FORWARD == _direction) { + if (_forward) { _curr = _recordStore->getNextRecord( _txn, _curr ); } else { _curr = _recordStore->getPrevRecord( _txn, _curr ); } } - - return ret.toRecordId(); } void SimpleRecordStoreV1Iterator::invalidate(const RecordId& dl) { // Just move past the thing being deleted. if (dl == _curr.toRecordId()) { - // We don't care about the return of getNext so much as the side effect of moving _curr - // to the 'next' thing. - getNext(); + advance(); } } - void SimpleRecordStoreV1Iterator::saveState() { + void SimpleRecordStoreV1Iterator::savePositioned() { + _txn = nullptr; } - bool SimpleRecordStoreV1Iterator::restoreState(OperationContext* txn) { + bool SimpleRecordStoreV1Iterator::restore(OperationContext* txn) { _txn = txn; // if the collection is dropped, then the cursor should be destroyed return true; } - RecordData SimpleRecordStoreV1Iterator::dataFor( const RecordId& loc ) const { - return _recordStore->dataFor( _txn, loc ); + std::unique_ptr<RecordFetcher> SimpleRecordStoreV1Iterator::fetcherForNext() const { + return _recordStore->_extentManager->recordNeedsFetch(_curr); } + std::unique_ptr<RecordFetcher> SimpleRecordStoreV1Iterator::fetcherForId( + const RecordId& id) const { + return _recordStore->_extentManager->recordNeedsFetch(DiskLoc::fromRecordId(id)); + } } diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h index 0aabd6e28a2..c19c0c386b3 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_iterator.h @@ -41,34 +41,31 @@ namespace mongo { * * If start is not DiskLoc(), the iteration begins at that DiskLoc. */ - class SimpleRecordStoreV1Iterator : public RecordIterator { + class SimpleRecordStoreV1Iterator final : public RecordCursor { public: SimpleRecordStoreV1Iterator( OperationContext* txn, const SimpleRecordStoreV1* records, - const RecordId& start, - const CollectionScanParams::Direction& dir ); - virtual ~SimpleRecordStoreV1Iterator() { } + bool forward); - virtual bool isEOF(); - virtual RecordId getNext(); - virtual RecordId curr(); - - virtual void invalidate(const RecordId& dl); - virtual void saveState(); - virtual bool restoreState(OperationContext* txn); - - virtual RecordData dataFor( const RecordId& loc ) const; + boost::optional<Record> next() final; + boost::optional<Record> seekExact(const RecordId& id) final; + void savePositioned() final; + bool restore(OperationContext* txn) final; + void invalidate(const RecordId& dl) final; + std::unique_ptr<RecordFetcher> fetcherForNext() const final; + std::unique_ptr<RecordFetcher> fetcherForId(const RecordId& id) const final; private: + void advance(); + bool isEOF() { return _curr.isNull(); } + // for getNext, not owned OperationContext* _txn; // The result returned on the next call to getNext(). DiskLoc _curr; - - const SimpleRecordStoreV1* _recordStore; - - CollectionScanParams::Direction _direction; + const SimpleRecordStoreV1* const _recordStore; + const bool _forward; }; } // namespace mongo diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_test_help.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_test_help.cpp index 060da868303..7bfaee1867e 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_test_help.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_test_help.cpp @@ -263,8 +263,8 @@ namespace mongo { invariant(false); } - RecordFetcher* DummyExtentManager::recordNeedsFetch( const DiskLoc& loc ) const { - return NULL; + std::unique_ptr<RecordFetcher> DummyExtentManager::recordNeedsFetch(const DiskLoc& loc) const { + return {}; } MmapV1RecordHeader* DummyExtentManager::recordForV1( const DiskLoc& loc ) const { diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_test_help.h b/src/mongo/db/storage/mmap_v1/record_store_v1_test_help.h index 7c2fec02626..f37969c1ca6 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_test_help.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_test_help.h @@ -136,7 +136,7 @@ namespace mongo { virtual MmapV1RecordHeader* recordForV1( const DiskLoc& loc ) const; - virtual RecordFetcher* recordNeedsFetch( const DiskLoc& loc ) const; + virtual std::unique_ptr<RecordFetcher> recordNeedsFetch( const DiskLoc& loc ) const final; virtual Extent* extentForV1( const DiskLoc& loc ) const; diff --git a/src/mongo/db/storage/mmap_v1/repair_database.cpp b/src/mongo/db/storage/mmap_v1/repair_database.cpp index 777f8f25b05..f85120f8a87 100644 --- a/src/mongo/db/storage/mmap_v1/repair_database.cpp +++ b/src/mongo/db/storage/mmap_v1/repair_database.cpp @@ -339,10 +339,9 @@ namespace mongo { OldClientContext ctx(txn, ns ); Collection* coll = originalDatabase->getCollection( ns ); if ( coll ) { - scoped_ptr<RecordIterator> it( coll->getIterator(txn) ); - while ( !it->isEOF() ) { - RecordId loc = it->getNext(); - BSONObj obj = coll->docFor(txn, loc).value(); + auto cursor = coll->getCursor(txn); + while (auto record = cursor->next()) { + BSONObj obj = record->data.releaseToBson(); string ns = obj["name"].String(); @@ -404,12 +403,9 @@ namespace mongo { } } - scoped_ptr<RecordIterator> iterator(originalCollection->getIterator(txn)); - while ( !iterator->isEOF() ) { - RecordId loc = iterator->getNext(); - invariant( !loc.isNull() ); - - BSONObj doc = originalCollection->docFor(txn, loc).value(); + auto cursor = originalCollection->getCursor(txn); + while (auto record = cursor->next()) { + BSONObj doc = record->data.releaseToBson(); WriteUnitOfWork wunit(txn); StatusWith<RecordId> result = tempCollection->insertDocument(txn, diff --git a/src/mongo/db/storage/record_data.h b/src/mongo/db/storage/record_data.h index 57593ab09e8..612408f84c6 100644 --- a/src/mongo/db/storage/record_data.h +++ b/src/mongo/db/storage/record_data.h @@ -71,6 +71,18 @@ namespace mongo { // TODO uncomment once we require compilers that support overloading for rvalue this. // BSONObj toBson() && { return releaseToBson(); } + RecordData getOwned() const { + if (isOwned()) return *this; + auto buffer = SharedBuffer::allocate(_size); + memcpy(buffer.get(), _data, _size); + return RecordData(buffer, _size); + } + + void makeOwned() { + if (isOwned()) return; + *this = getOwned(); + } + private: const char* _data; int _size; diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index 417a7c2ff4a..5f4285995df 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -37,6 +37,7 @@ #include "mongo/db/exec/collection_scan_common.h" #include "mongo/db/record_id.h" #include "mongo/db/storage/record_data.h" +#include "mongo/db/storage/record_fetcher.h" namespace mongo { @@ -48,7 +49,6 @@ namespace mongo { class MAdvise; class NamespaceDetails; class OperationContext; - class Record; class RecordFetcher; class RecordStoreCompactAdaptor; @@ -83,40 +83,136 @@ namespace mongo { }; /** - * A RecordIterator provides an interface for walking over a RecordStore. - * The details of navigating the collection's structure are below this interface. + * The data items stored in a RecordStore. */ - class RecordIterator { + struct Record { + RecordId id; + RecordData data; + }; + + /** + * Retrieves Records from a RecordStore. + * + * A cursor is constructed with a direction flag with the following effects: + * - The direction that next() moves. + * - If a restore cannot return to the saved position, cursors will be positioned on the + * closest position *after* the query in the direction of the scan. + * + * A cursor is tied to a transaction, such as the OperationContext or a WriteUnitOfWork + * inside that context. Any cursor acquired inside a transaction is invalid outside + * of that transaction, instead use the save and restore methods to reestablish the cursor. + * + * Any method other than invalidate and the save methods may throw WriteConflict exception. If + * that happens, the cursor may not be used again until it has been saved and successfully + * restored. If next() or restore() throw a WCE the cursor's position will be the same as before + * the call (strong exception guarantee). All other methods leave the cursor in a valid state + * but with an unspecified position (basic exception guarantee). If any exception other than + * WCE is thrown, the cursor must be destroyed, which is guaranteed not to leak any resources. + * + * Any returned unowned BSON is only valid until the next call to any method on this + * interface. + * + * Implementations may override any default implementation if they can provide a more + * efficient implementation. + */ + class RecordCursor { public: - virtual ~RecordIterator() { } + virtual ~RecordCursor() = default; - // True if getNext will produce no more data, false otherwise. - virtual bool isEOF() = 0; + /** + * Moves forward and returns the new data or boost::none if there is no more data. + * Continues returning boost::none once it reaches EOF. + */ + virtual boost::optional<Record> next() = 0; - // Return the RecordId that the iterator points at. Returns RecordId() if isEOF. - virtual RecordId curr() = 0; + // + // Seeking + // + // Warning: MMAPv1 cannot detect if RecordIds are valid. Therefore callers should only pass + // potentially deleted RecordIds to seek methods if they know that MMAPv1 is not the current + // storage engine. All new storage engines must support detecting the existence of Records. + // - // Return the RecordId that the iterator points at and move the iterator to the next item - // from the collection. Returns RecordId() if isEOF. - virtual RecordId getNext() = 0; + /** + * Seeks to a Record with the provided id. + * + * If an exact match can't be found, boost::none will be returned and the resulting position + * of the cursor is unspecified. + */ + virtual boost::optional<Record> seekExact(const RecordId& id) = 0; - // Can only be called after saveState and before restoreState. - virtual void invalidate(const RecordId& dl) = 0; + // + // Saving and restoring state + // - // Save any state required to resume operation (without crashing) after RecordId deletion or - // a collection drop. - virtual void saveState() = 0; + /** + * Prepares for state changes in underlying data in a way that allows the cursor's + * current position to be restored. + * + * It is safe to call savePositioned multiple times in a row. + * No other method (excluding destructor) may be called until successfully restored. + */ + virtual void savePositioned() = 0; - // Returns true if collection still exists, false otherwise. - // The state of the iterator may be restored into a different context - // than the one it was created in. - virtual bool restoreState(OperationContext* txn) = 0; + /** + * Prepares for state changes in underlying data without necessarily saving the current + * state. + * + * The cursor's position when restored is unspecified. Caller is expected to seek rather + * than call next() following the restore. + * + * It is safe to call saveUnpositioned multiple times in a row. + * No other method (excluding destructor) may be called until successfully restored. + */ + virtual void saveUnpositioned() { savePositioned(); } - // normally this will just go back to the RecordStore and convert - // but this gives the iterator an oppurtnity to optimize - virtual RecordData dataFor( const RecordId& loc ) const = 0; - }; + /** + * Recovers from potential state changes in underlying data. + * + * Returns false if it is invalid to continue using this iterator. This usually means that + * capped deletes have caught up to the position of this iterator and continuing could + * result in missed data. + * + * If the former position no longer exists, but it is safe to continue iterating, the + * following call to next() will return the next closest position in the direction of the + * scan, if any. + * + * This handles restoring after either savePositioned() or saveUnpositioned(). + */ + virtual bool restore(OperationContext* txn) = 0; + /** + * Inform the cursor that this id is being invalidated. + * Must be called between save and restore. + * + * WARNING: Storage engines other than MMAPv1 should not depend on this being called. + */ + virtual void invalidate(const RecordId& id) {}; + + // + // RecordFetchers + // + // Storage engines which do not support document-level locking hold locks at collection or + // database granularity. As an optimization, these locks can be yielded when a record needs + // to be fetched from secondary storage. If this method returns non-NULL, then it indicates + // that the query system layer should yield its locks, following the protocol defined by the + // RecordFetcher class, so that a potential page fault is triggered out of the lock. + // + // Storage engines which support document-level locking need not implement this. + // + // TODO see if these can be replaced by WriteConflictException. + // + + /** + * Returns a RecordFetcher if needed for a call to next() or none if unneeded. + */ + virtual std::unique_ptr<RecordFetcher> fetcherForNext() const { return {}; } + + /** + * Returns a RecordFetcher if needed to fetch the provided Record or none if unneeded. + */ + virtual std::unique_ptr<RecordFetcher> fetcherForId(const RecordId& id) const { return {}; } + }; /** * A RecordStore provides an abstraction used for storing documents in a collection, @@ -124,7 +220,8 @@ namespace mongo { * are also used for implementing catalogs. * * Many methods take an OperationContext parameter. This contains the RecoveryUnit, with - * all RecordStore specific transaction information, as well as the LockState. + * all RecordStore specific transaction information, as well as the LockState. Methods that take + * an OperationContext may throw a WriteConflictException. */ class RecordStore { MONGO_DISALLOW_COPYING(RecordStore); @@ -167,15 +264,46 @@ namespace mongo { // CRUD related - virtual RecordData dataFor( OperationContext* txn, const RecordId& loc) const = 0; + /** + * Get the RecordData at loc, which must exist. + * + * If unowned data is returned, it is valid until the next modification of this Record or + * the lock on this collection is released. + * + * In general, prefer findRecord or RecordCursor::seekExact since they can tell you if a + * record has been removed. + */ + virtual RecordData dataFor(OperationContext* txn, const RecordId& loc) const { + RecordData data; + invariant(findRecord(txn, loc, &data)); + return data; + } /** * @param out - If the record exists, the contents of this are set. * @return true iff there is a Record for loc + * + * If unowned data is returned, it is valid until the next modification of this Record or + * the lock on this collection is released. + * + * In general prefer RecordCursor::seekExact since it can avoid copying data in more + * storageEngines. + * + * Warning: MMAPv1 cannot detect if RecordIds are valid. Therefore callers should only pass + * potentially deleted RecordIds to seek methods if they know that MMAPv1 is not the current + * storage engine. All new storage engines must support detecting the existence of Records. */ - virtual bool findRecord( OperationContext* txn, - const RecordId& loc, - RecordData* out ) const = 0; + virtual bool findRecord(OperationContext* txn, + const RecordId& loc, + RecordData* out) const { + auto cursor = getCursor(txn); + auto record = cursor->seekExact(loc); + if (!record) return false; + + record->data.makeOwned(); // Unowned data expires when cursor goes out of scope. + *out = std::move(record->data); + return true; + } virtual void deleteRecord( OperationContext* txn, const RecordId& dl ) = 0; @@ -220,49 +348,45 @@ namespace mongo { const mutablebson::DamageVector& damages ) = 0; /** - * Storage engines which do not support document-level locking hold locks at - * collection or database granularity. As an optimization, these locks can be yielded - * when a record needs to be fetched from secondary storage. If this method returns - * non-NULL, then it indicates that the query system layer should yield and reacquire its - * locks. + * Returns a new cursor over this record store. * - * The return value is a functor that should be invoked when the locks are yielded; - * it should access the record at 'loc' so that a potential page fault is triggered - * out of the lock. - * - * The caller is responsible for deleting the return value. - * - * Storage engines which support document-level locking need not implement this. + * The cursor is logically positioned before the first (or last if !forward) Record in the + * collection so that Record will be returned on the first call to next(). Implementations + * are allowed to lazily seek to the first Record when next() is called rather than doing + * it on construction. */ - virtual RecordFetcher* recordNeedsFetch( OperationContext* txn, - const RecordId& loc ) const { return NULL; } + virtual std::unique_ptr<RecordCursor> getCursor(OperationContext* txn, + bool forward = true) const = 0; /** - * returned iterator owned by caller - * Default arguments return all items in record store. If this function is called - * twice on the same RecoveryUnit (in the OperationContext), without intervening - * reset of it, the iterator must be based on the same snapshot. - */ - virtual RecordIterator* getIterator( OperationContext* txn, - const RecordId& start = RecordId(), - const CollectionScanParams::Direction& dir = - CollectionScanParams::FORWARD - ) const = 0; - - /** - * Constructs an iterator over a potentially corrupted store, which can be used to salvage + * Constructs a cursor over a potentially corrupted store, which can be used to salvage * damaged records. The iterator might return every record in the store if all of them * are reachable and not corrupted. Returns NULL if not supported. + * + * Repair cursors are only required to support forward scanning, so it is illegal to call + * seekExact() on the returned cursor. */ - virtual RecordIterator* getIteratorForRepair( OperationContext* txn ) const { - return NULL; + virtual std::unique_ptr<RecordCursor> getCursorForRepair( OperationContext* txn ) const { + return {}; } /** - * Returns many iterators that partition the RecordStore into many disjoint sets. Iterating - * all returned iterators is equivalent to Iterating the full store. + * Returns many RecordCursors that partition the RecordStore into many disjoint sets. + * Iterating all returned RecordCursors is equivalent to iterating the full store. + * + * Partition cursors are only required to support forward scanning, so it is illegal to call + * seekExact() on any of the returned cursors. + * + * WARNING: the first call to restore() on each cursor may (but is not guaranteed to) be on + * a different RecoveryUnit than the initial save. This will be made more sane as part of + * SERVER-17364. */ - virtual std::vector<RecordIterator*> getManyIterators( OperationContext* txn ) const = 0; + virtual std::vector<std::unique_ptr<RecordCursor>> getManyCursors( + OperationContext* txn) const { + std::vector<std::unique_ptr<RecordCursor>> out(1); + out[0] = getCursor(txn); + return out; + } // higher level diff --git a/src/mongo/db/storage/record_store_test_harness.cpp b/src/mongo/db/storage/record_store_test_harness.cpp index 466c0891251..149509abf63 100644 --- a/src/mongo/db/storage/record_store_test_harness.cpp +++ b/src/mongo/db/storage/record_store_test_harness.cpp @@ -417,29 +417,25 @@ namespace mongo { { int x = 0; scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - scoped_ptr<RecordIterator> it( rs->getIterator( opCtx.get() ) ); - while ( !it->isEOF() ) { - RecordId loc = it->getNext(); - RecordData data = it->dataFor( loc ); + auto cursor = rs->getCursor(opCtx.get()); + while (auto record = cursor->next()) { string s = str::stream() << "eliot" << x++; - ASSERT_EQUALS( s, data.data() ); + ASSERT_EQUALS(s, record->data.data()); } ASSERT_EQUALS( N, x ); + ASSERT(!cursor->next()); } { int x = N; scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - scoped_ptr<RecordIterator> it( rs->getIterator( opCtx.get(), - RecordId(), - CollectionScanParams::BACKWARD ) ); - while ( !it->isEOF() ) { - RecordId loc = it->getNext(); - RecordData data = it->dataFor( loc ); + auto cursor = rs->getCursor(opCtx.get(), false); + while (auto record = cursor->next()) { string s = str::stream() << "eliot" << --x; - ASSERT_EQUALS( s, data.data() ); + ASSERT_EQUALS(s, record->data.data()); } ASSERT_EQUALS( 0, x ); + ASSERT(!cursor->next()); } } diff --git a/src/mongo/db/storage/record_store_test_manyiter.cpp b/src/mongo/db/storage/record_store_test_manyiter.cpp index ba31c2109e7..57723e23723 100644 --- a/src/mongo/db/storage/record_store_test_manyiter.cpp +++ b/src/mongo/db/storage/record_store_test_manyiter.cpp @@ -58,19 +58,9 @@ namespace mongo { { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - vector<RecordIterator*> v = rs->getManyIterators( opCtx.get() ); - - for (vector<RecordIterator*>::iterator vIter = v.begin(); - vIter != v.end(); vIter++) { - - RecordIterator *rIter = *vIter; - ASSERT( rIter->isEOF() ); - ASSERT_EQUALS( RecordId(), rIter->curr() ); - ASSERT_EQUALS( RecordId(), rIter->getNext() ); - ASSERT( rIter->isEOF() ); - ASSERT_EQUALS( RecordId(), rIter->curr() ); - - delete rIter; + for (auto&& cursor : rs->getManyCursors(opCtx.get())) { + ASSERT(!cursor->next()); + ASSERT(!cursor->next()); } } } @@ -113,24 +103,12 @@ namespace mongo { set<RecordId> remain( locs, locs + nToInsert ); { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - vector<RecordIterator*> v = rs->getManyIterators( opCtx.get() ); - - for (vector<RecordIterator*>::iterator vIter = v.begin(); - vIter != v.end(); vIter++) { - - RecordIterator *rIter = *vIter; - while ( !rIter->isEOF() ) { - RecordId loc = rIter->curr(); - ASSERT( 1 == remain.erase( loc ) ); - ASSERT_EQUALS( loc, rIter->getNext() ); + for (auto&& cursor : rs->getManyCursors(opCtx.get())) { + while (auto record = cursor->next()) { + ASSERT_EQ(remain.erase(record->id), size_t(1)); } - ASSERT_EQUALS( RecordId(), rIter->curr() ); - ASSERT_EQUALS( RecordId(), rIter->getNext() ); - ASSERT( rIter->isEOF() ); - ASSERT_EQUALS( RecordId(), rIter->curr() ); - - delete rIter; + ASSERT(!cursor->next()); } ASSERT( remain.empty() ); } diff --git a/src/mongo/db/storage/record_store_test_recorditer.cpp b/src/mongo/db/storage/record_store_test_recorditer.cpp index 4d67b6bfd90..63edb205c6a 100644 --- a/src/mongo/db/storage/record_store_test_recorditer.cpp +++ b/src/mongo/db/storage/record_store_test_recorditer.cpp @@ -59,6 +59,7 @@ namespace mongo { const int nToInsert = 10; RecordId locs[nToInsert]; + std::string datas[nToInsert]; for ( int i = 0; i < nToInsert; i++ ) { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); { @@ -73,6 +74,7 @@ namespace mongo { false ); ASSERT_OK( res.getStatus() ); locs[i] = res.getValue(); + datas[i] = data; uow.commit(); } } @@ -85,24 +87,14 @@ namespace mongo { std::sort( locs, locs + nToInsert ); // inserted records may not be in RecordId order { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - - RecordIterator *it = rs->getIterator( opCtx.get(), - RecordId(), - CollectionScanParams::FORWARD ); - + auto cursor = rs->getCursor(opCtx.get()); for ( int i = 0; i < nToInsert; i++ ) { - ASSERT( !it->isEOF() ); - ASSERT_EQUALS( locs[i], it->curr() ); - ASSERT_EQUALS( locs[i], it->getNext() ); + const auto record = cursor->next(); + ASSERT(record); + ASSERT_EQUALS( locs[i], record->id ); + ASSERT_EQUALS( datas[i], record->data.data() ); } - ASSERT( it->isEOF() ); - - ASSERT_EQUALS( RecordId(), it->curr() ); - ASSERT_EQUALS( RecordId(), it->getNext() ); - ASSERT( it->isEOF() ); - ASSERT_EQUALS( RecordId(), it->curr() ); - - delete it; + ASSERT(!cursor->next()); } } @@ -120,6 +112,7 @@ namespace mongo { const int nToInsert = 10; RecordId locs[nToInsert]; + std::string datas[nToInsert]; for ( int i = 0; i < nToInsert; i++ ) { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); { @@ -134,6 +127,7 @@ namespace mongo { false ); ASSERT_OK( res.getStatus() ); locs[i] = res.getValue(); + datas[i] = data; uow.commit(); } } @@ -147,23 +141,14 @@ namespace mongo { { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - RecordIterator *it = rs->getIterator( opCtx.get(), - RecordId(), - CollectionScanParams::BACKWARD ); - + auto cursor = rs->getCursor(opCtx.get(), false); for ( int i = nToInsert - 1; i >= 0; i-- ) { - ASSERT( !it->isEOF() ); - ASSERT_EQUALS( locs[i], it->curr() ); - ASSERT_EQUALS( locs[i], it->getNext() ); + const auto record = cursor->next(); + ASSERT(record); + ASSERT_EQUALS( locs[i], record->id ); + ASSERT_EQUALS( datas[i], record->data.data() ); } - ASSERT( it->isEOF() ); - - ASSERT_EQUALS( RecordId(), it->curr() ); - ASSERT_EQUALS( RecordId(), it->getNext() ); - ASSERT( it->isEOF() ); - ASSERT_EQUALS( RecordId(), it->curr() ); - - delete it; + ASSERT(!cursor->next()); } } @@ -180,6 +165,7 @@ namespace mongo { const int nToInsert = 10; RecordId locs[nToInsert]; + std::string datas[nToInsert]; for ( int i = 0; i < nToInsert; i++ ) { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); { @@ -194,6 +180,7 @@ namespace mongo { false ); ASSERT_OK( res.getStatus() ); locs[i] = res.getValue(); + datas[i] = data; uow.commit(); } } @@ -208,23 +195,14 @@ namespace mongo { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); int start = nToInsert / 2; - RecordIterator *it = rs->getIterator( opCtx.get(), - locs[start], - CollectionScanParams::FORWARD ); - + auto cursor = rs->getCursor(opCtx.get()); for ( int i = start; i < nToInsert; i++ ) { - ASSERT( !it->isEOF() ); - ASSERT_EQUALS( locs[i], it->curr() ); - ASSERT_EQUALS( locs[i], it->getNext() ); + const auto record = (i == start) ? cursor->seekExact(locs[i]) : cursor->next(); + ASSERT(record); + ASSERT_EQUALS( locs[i], record->id ); + ASSERT_EQUALS( datas[i], record->data.data() ); } - ASSERT( it->isEOF() ); - - ASSERT_EQUALS( RecordId(), it->curr() ); - ASSERT_EQUALS( RecordId(), it->getNext() ); - ASSERT( it->isEOF() ); - ASSERT_EQUALS( RecordId(), it->curr() ); - - delete it; + ASSERT(!cursor->next()); } } @@ -241,6 +219,7 @@ namespace mongo { const int nToInsert = 10; RecordId locs[nToInsert]; + std::string datas[nToInsert]; for ( int i = 0; i < nToInsert; i++ ) { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); { @@ -255,6 +234,7 @@ namespace mongo { false ); ASSERT_OK( res.getStatus() ); locs[i] = res.getValue(); + datas[i] = data; uow.commit(); } } @@ -269,23 +249,14 @@ namespace mongo { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); int start = nToInsert / 2; - RecordIterator *it = rs->getIterator( opCtx.get(), - locs[start], - CollectionScanParams::BACKWARD ); - + auto cursor = rs->getCursor(opCtx.get(), false); for ( int i = start; i >= 0; i-- ) { - ASSERT( !it->isEOF() ); - ASSERT_EQUALS( locs[i], it->curr() ); - ASSERT_EQUALS( locs[i], it->getNext() ); + const auto record = (i == start) ? cursor->seekExact(locs[i]) : cursor->next(); + ASSERT(record); + ASSERT_EQUALS( locs[i], record->id ); + ASSERT_EQUALS( datas[i], record->data.data() ); } - ASSERT( it->isEOF() ); - - ASSERT_EQUALS( RecordId(), it->curr() ); - ASSERT_EQUALS( RecordId(), it->getNext() ); - ASSERT( it->isEOF() ); - ASSERT_EQUALS( RecordId(), it->curr() ); - - delete it; + ASSERT(!cursor->next()); } } @@ -303,6 +274,7 @@ namespace mongo { const int nToInsert = 10; RecordId locs[nToInsert]; + std::string datas[nToInsert]; for ( int i = 0; i < nToInsert; i++ ) { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); { @@ -317,6 +289,7 @@ namespace mongo { false ); ASSERT_OK( res.getStatus() ); locs[i] = res.getValue(); + datas[i] = data; uow.commit(); } } @@ -330,19 +303,19 @@ namespace mongo { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); // Get a forward iterator starting at the beginning of the record store. - scoped_ptr<RecordIterator> it( rs->getIterator( opCtx.get() ) ); + auto cursor = rs->getCursor(opCtx.get()); // Iterate, checking EOF along the way. for ( int i = 0; i < nToInsert; i++ ) { - ASSERT( !it->isEOF() ); - RecordId nextLoc = it->getNext(); - ASSERT( !nextLoc.isNull() ); + const auto record = cursor->next(); + ASSERT(record); + ASSERT_EQUALS( locs[i], record->id ); + ASSERT_EQUALS( datas[i], record->data.data() ); } - ASSERT( it->isEOF() ); - ASSERT( it->getNext().isNull() ); + ASSERT(!cursor->next()); // Add a record and ensure we're still EOF. - it->saveState(); + cursor->savePositioned(); StringBuilder sb; sb << "record " << nToInsert + 1; @@ -356,11 +329,74 @@ namespace mongo { ASSERT_OK( res.getStatus() ); uow.commit(); - ASSERT( it->restoreState( opCtx.get() ) ); + ASSERT( cursor->restore( opCtx.get() ) ); // Iterator should still be EOF. - ASSERT( it->isEOF() ); - ASSERT( it->getNext().isNull() ); + ASSERT(!cursor->next()); + ASSERT(!cursor->next()); + } + } + + // Test calling savePositioned and restore after each call to next + TEST( RecordStoreTestHarness, RecordIteratorSavePositionedRestore ) { + scoped_ptr<HarnessHelper> harnessHelper( newHarnessHelper() ); + scoped_ptr<RecordStore> rs( harnessHelper->newNonCappedRecordStore() ); + + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + ASSERT_EQUALS( 0, rs->numRecords( opCtx.get() ) ); + } + + const int nToInsert = 10; + RecordId locs[nToInsert]; + std::string datas[nToInsert]; + for ( int i = 0; i < nToInsert; i++ ) { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + { + StringBuilder sb; + sb << "record " << i; + string data = sb.str(); + + WriteUnitOfWork uow( opCtx.get() ); + StatusWith<RecordId> res = rs->insertRecord( opCtx.get(), + data.c_str(), + data.size() + 1, + false ); + ASSERT_OK( res.getStatus() ); + locs[i] = res.getValue(); + datas[i] = data; + uow.commit(); + } + } + + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + ASSERT_EQUALS( nToInsert, rs->numRecords( opCtx.get() ) ); + } + + { + scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); + + // Get a forward iterator starting at the beginning of the record store. + auto cursor = rs->getCursor(opCtx.get()); + + // Iterate, checking EOF along the way. + for ( int i = 0; i < nToInsert; i++ ) { + cursor->savePositioned(); + cursor->savePositioned(); // It is legal to save twice in a row. + cursor->restore(opCtx.get()); + + const auto record = cursor->next(); + ASSERT(record); + ASSERT_EQUALS( locs[i], record->id ); + ASSERT_EQUALS( datas[i], record->data.data() ); + } + + cursor->savePositioned(); + cursor->savePositioned(); // It is legal to save twice in a row. + cursor->restore(opCtx.get()); + + ASSERT(!cursor->next()); } } diff --git a/src/mongo/db/storage/record_store_test_repairiter.cpp b/src/mongo/db/storage/record_store_test_repairiter.cpp index 12f035aff32..abb6e5f6bb3 100644 --- a/src/mongo/db/storage/record_store_test_repairiter.cpp +++ b/src/mongo/db/storage/record_store_test_repairiter.cpp @@ -56,19 +56,12 @@ namespace mongo { { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - RecordIterator *it = rs->getIteratorForRepair( opCtx.get() ); - - // returns NULL if getIteratorForRepair is not supported - if (it == NULL) { + auto cursor = rs->getCursorForRepair( opCtx.get() ); + // returns NULL if getCursorForRepair is not supported + if (!cursor) { return; } - ASSERT( it->isEOF() ); - ASSERT_EQUALS( RecordId(), it->curr() ); - ASSERT_EQUALS( RecordId(), it->getNext() ); - ASSERT( it->isEOF() ); - ASSERT_EQUALS( RecordId(), it->curr() ); - - delete it; + ASSERT(!cursor->next()); } } @@ -111,24 +104,18 @@ namespace mongo { set<RecordId> remain( locs, locs + nToInsert ); { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - RecordIterator *it = rs->getIteratorForRepair( opCtx.get() ); - // returns NULL if getIteratorForRepair is not supported - if (it == NULL) { + auto cursor = rs->getCursorForRepair( opCtx.get() ); + // returns NULL if getCursorForRepair is not supported + if (!cursor) { return; } - while ( !it->isEOF() ) { - RecordId loc = it->getNext(); - remain.erase( loc ); // can happen more than once per doc + while (auto record = cursor->next()) { + remain.erase(record->id); // can happen more than once per doc } ASSERT( remain.empty() ); - ASSERT_EQUALS( RecordId(), it->curr() ); - ASSERT_EQUALS( RecordId(), it->getNext() ); - ASSERT( it->isEOF() ); - ASSERT_EQUALS( RecordId(), it->curr() ); - - delete it; + ASSERT(!cursor->next()); } } @@ -163,24 +150,21 @@ namespace mongo { { scoped_ptr<OperationContext> opCtx(harnessHelper->newOperationContext()); - scoped_ptr<RecordIterator> it(rs->getIteratorForRepair(opCtx.get())); - // Return value of NULL is expected if getIteratorForRepair is not supported. - if (!it) { + auto cursor = rs->getCursorForRepair( opCtx.get() ); + // returns NULL if getCursorForRepair is not supported + if (!cursor) { return; } // We should be pointing at the only record in the store. - ASSERT_EQ(idToInvalidate, it->curr()); - ASSERT(!it->isEOF()); // Invalidate the record we're pointing at. - it->saveState(); - it->invalidate(idToInvalidate); - it->restoreState(opCtx.get()); + cursor->savePositioned(); + cursor->invalidate(idToInvalidate); + cursor->restore(opCtx.get()); // Iterator should be EOF now because the only thing in the collection got deleted. - ASSERT(it->isEOF()); - ASSERT_EQ(it->getNext(), RecordId()); + ASSERT(!cursor->next()); } } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 5739df724ce..67faf2ae9ef 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -50,6 +50,7 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" +#include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" #include "mongo/util/log.h" @@ -89,6 +90,195 @@ namespace { const long long WiredTigerRecordStore::kCollectionScanOnCreationThreshold = 10000; + class WiredTigerRecordStore::Cursor final : public RecordCursor { + public: + Cursor(OperationContext* txn, + const WiredTigerRecordStore& rs, + bool forward = true, + bool forParallelCollectionScan = false) + : _rs(rs) + , _txn(txn) + , _forward(forward) + , _forParallelCollectionScan(forParallelCollectionScan) + , _cursor(new WiredTigerCursor(rs.getURI(), rs.instanceId(), true, txn)) + , _readUntilForOplog(WiredTigerRecoveryUnit::get(txn)->getOplogReadTill()) + {} + + boost::optional<Record> next() final { + if (_eof) return {}; + + WT_CURSOR* c = _cursor->get(); + { + // Nothing after the next line can throw WCEs. + // Note that an unpositioned (or eof) WT_CURSOR returns the first/last entry in the + // table when you call next/prev. + int advanceRet = WT_OP_CHECK(_forward ? c->next(c) : c->prev(c)); + if (advanceRet == WT_NOTFOUND) { + _eof = true; + return {}; + } + invariantWTOK(advanceRet); + } + + int64_t key; + invariantWTOK(c->get_key(c, &key)); + const RecordId id = _fromKey(key); + + if (!isVisible(id)) { + _eof = true; + return {}; + } + + WT_ITEM value; + invariantWTOK(c->get_value(c, &value)); + auto data = RecordData(static_cast<const char*>(value.data), value.size); + data.makeOwned(); // TODO delete this line once safe. + + _lastReturnedId = id; + return {{id, std::move(data)}}; + } + + boost::optional<Record> seekExact(const RecordId& id) final { + if (!isVisible(id)) { + _eof = true; + return {}; + } + + WT_CURSOR* c = _cursor->get(); + c->set_key(c, _makeKey(id)); + // Nothing after the next line can throw WCEs. + int seekRet = WT_OP_CHECK(c->search(c)); + if (seekRet == WT_NOTFOUND) { + _eof = true; + return {}; + } + invariantWTOK(seekRet); + + WT_ITEM value; + invariantWTOK(c->get_value(c, &value)); + auto data = RecordData(static_cast<const char*>(value.data), value.size); + data.makeOwned(); // TODO delete this line once safe. + + _lastReturnedId = id; + return {{id, std::move(data)}}; + } + + void savePositioned() final { + // It must be safe to call save() twice in a row without calling restore(). + if (!_txn) return; + + // the cursor and recoveryUnit are valid on restore + // so we just record the recoveryUnit to make sure + _savedRecoveryUnit = _txn->recoveryUnit(); + if ( _cursor && !wt_keeptxnopen() ) { + try { + _cursor->reset(); + } + catch (const WriteConflictException& wce) { + // Ignore since this is only called when we are about to kill our transaction + // anyway. + } + } + + if (_forParallelCollectionScan) { + // Delete the cursor since we may come back to a different RecoveryUnit + _cursor.reset(); + } + _txn = nullptr; + } + + void saveUnpositioned() final { + savePositioned(); + _lastReturnedId = RecordId(); + } + + bool restore(OperationContext* txn) final { + _txn = txn; + + // If we've hit EOF, then this iterator is done and need not be restored. + if (_eof) return true; + + bool needRestore = false; + + if (_forParallelCollectionScan) { + needRestore = true; + _savedRecoveryUnit = txn->recoveryUnit(); + _cursor.reset( new WiredTigerCursor( _rs.getURI(), _rs.instanceId(), true, txn ) ); + _forParallelCollectionScan = false; // we only do this the first time + } + invariant( _savedRecoveryUnit == txn->recoveryUnit() ); + + if (!needRestore && wt_keeptxnopen()) return true; + if (_lastReturnedId.isNull()) return true; + + // This will ensure an active session exists, so any restored cursors will bind to it + invariant(WiredTigerRecoveryUnit::get(txn)->getSession(txn) == _cursor->getSession()); + + WT_CURSOR* c = _cursor->get(); + c->set_key(c, _makeKey(_lastReturnedId)); + + int cmp; + int ret = WT_OP_CHECK(c->search_near(c, &cmp)); + if (ret == WT_NOTFOUND) { + _eof = true; + return !_rs._isCapped; + } + invariantWTOK(ret); + + if (cmp == 0) return true; // Landed right where we left off. + + if (_rs._isCapped) { + // Doc was deleted either by cappedDeleteAsNeeded() or cappedTruncateAfter(). + // It is important that we error out in this case so that consumers don't + // silently get 'holes' when scanning capped collections. We don't make + // this guarantee for normal collections so it is ok to skip ahead in that case. + _eof = true; + return false; + } + + if (_forward && cmp > 0) { + // We landed after where we were. Move back one so that next() will return this + // document. + ret = WT_OP_CHECK(c->prev(c)); + } + else if (!_forward && cmp < 0) { + // Do the opposite for reverse cursors. + ret = WT_OP_CHECK(c->next(c)); + } + if (ret != WT_NOTFOUND) invariantWTOK(ret); + + return true; + } + + private: + bool isVisible(const RecordId& id) { + if (!_rs._isCapped) return true; + + if ( _readUntilForOplog.isNull() || !_rs._isOplog ) { + // this is the normal capped case + return !_rs.isCappedHidden(id); + } + + // this is for oplogs + if (id == _readUntilForOplog) { + // we allow if its been committed already + return !_rs.isCappedHidden(id); + } + + return id < _readUntilForOplog; + } + + const WiredTigerRecordStore& _rs; + OperationContext* _txn; + RecoveryUnit* _savedRecoveryUnit; // only used to sanity check between save/restore. + const bool _forward; + bool _forParallelCollectionScan; // This can go away once SERVER-17364 is resolved. + std::unique_ptr<WiredTigerCursor> _cursor; + bool _eof = false; + RecordId _lastReturnedId; + const RecordId _readUntilForOplog; + }; + StatusWith<std::string> WiredTigerRecordStore::parseOptionsField(const BSONObj options) { StringBuilder ss; BSONForEach(elem, options) { @@ -207,20 +397,10 @@ namespace { } // Find the largest RecordId currently in use and estimate the number of records. - scoped_ptr<RecordIterator> iterator( getIterator( ctx, RecordId(), - CollectionScanParams::BACKWARD ) ); - if ( iterator->isEOF() ) { - _dataSize.store(0); - _numRecords.store(0); - // Need to start at 1 so we are always higher than RecordId::min() - _nextIdNum.store( 1 ); - if ( sizeStorer ) - _sizeStorer->onCreate( this, 0, 0 ); - } - else { - RecordId maxLoc = iterator->curr(); - int64_t max = _makeKey( maxLoc ); - _oplog_highestSeen = maxLoc; + Cursor cursor(ctx, *this, /*forward=*/false); + if (auto record = cursor.next()) { + int64_t max = _makeKey(record->id); + _oplog_highestSeen = record->id; _nextIdNum.store( 1 + max ); if ( _sizeStorer ) { @@ -238,12 +418,10 @@ namespace { _numRecords.store(0); _dataSize.store(0); - while( !iterator->isEOF() ) { - RecordId loc = iterator->getNext(); - RecordData data = iterator->dataFor( loc ); + do { _numRecords.fetchAndAdd(1); - _dataSize.fetchAndAdd(data.size()); - } + _dataSize.fetchAndAdd(record->data.size()); + } while ((record = cursor.next())); if ( _sizeStorer ) { _sizeStorer->storeToCache( _uri, _numRecords.load(), _dataSize.load() ); @@ -251,6 +429,14 @@ namespace { } } + else { + _dataSize.store(0); + _numRecords.store(0); + // Need to start at 1 so we are always higher than RecordId::min() + _nextIdNum.store( 1 ); + if ( sizeStorer ) + _sizeStorer->onCreate( this, 0, 0 ); + } _hasBackgroundThread = WiredTigerKVEngine::initRsOplogBackgroundThread(ns); } @@ -688,12 +874,10 @@ namespace { } } - RecordIterator* WiredTigerRecordStore::getIterator( - OperationContext* txn, - const RecordId& start, - const CollectionScanParams::Direction& dir) const { + std::unique_ptr<RecordCursor> WiredTigerRecordStore::getCursor(OperationContext* txn, + bool forward) const { - if ( _isOplog && dir == CollectionScanParams::FORWARD ) { + if ( _isOplog && forward ) { WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(txn); if ( !wru->inActiveTxn() || wru->getOplogReadTill().isNull() ) { // if we don't have a session, we have no snapshot, so we can update our view @@ -701,20 +885,15 @@ namespace { } } - return new Iterator(*this, txn, start, dir, false); + return stdx::make_unique<Cursor>(txn, *this, forward); } - - std::vector<RecordIterator*> WiredTigerRecordStore::getManyIterators( - OperationContext* txn ) const { - - // XXX do we want this to actually return a set of iterators? - - std::vector<RecordIterator*> iterators; - iterators.push_back( new Iterator(*this, txn, RecordId(), - CollectionScanParams::FORWARD, true) ); - - return iterators; + std::vector<std::unique_ptr<RecordCursor>> WiredTigerRecordStore::getManyCursors( + OperationContext* txn) const { + std::vector<std::unique_ptr<RecordCursor>> cursors(1); + cursors[0] = stdx::make_unique<Cursor>(txn, *this, /*forward=*/true, + /*forParallelCollectionScan=*/true); + return cursors; } Status WiredTigerRecordStore::truncate( OperationContext* txn ) { @@ -776,22 +955,19 @@ namespace { long long nrecords = 0; long long dataSizeTotal = 0; - boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) ); results->valid = true; - while( !iter->isEOF() ) { + Cursor cursor(txn, *this, true); + while (auto record = cursor.next()) { ++nrecords; if ( full && scanData ) { size_t dataSize; - RecordId loc = iter->curr(); - RecordData data = dataFor( txn, loc ); - Status status = adaptor->validate( data, &dataSize ); + Status status = adaptor->validate( record->data, &dataSize ); if ( !status.isOK() ) { results->valid = false; - results->errors.push_back( str::stream() << loc << " is corrupted" ); + results->errors.push_back( str::stream() << record->id << " is corrupted" ); } dataSizeTotal += static_cast<long long>(dataSize); } - iter->getNext(); } if (_sizeStorer && full && scanData && results->valid) { @@ -1019,254 +1195,13 @@ namespace { return RecordId(key); } - // -------- - - WiredTigerRecordStore::Iterator::Iterator( - const WiredTigerRecordStore& rs, - OperationContext *txn, - const RecordId& start, - const CollectionScanParams::Direction& dir, - bool forParallelCollectionScan) - : _rs( rs ), - _txn( txn ), - _forward( dir == CollectionScanParams::FORWARD ), - _forParallelCollectionScan( forParallelCollectionScan ), - _cursor( new WiredTigerCursor( rs.getURI(), rs.instanceId(), true, txn ) ), - _eof(false), - _readUntilForOplog(WiredTigerRecoveryUnit::get(txn)->getOplogReadTill()) { - RS_ITERATOR_TRACE("start"); - _locate(start, true); - } - - WiredTigerRecordStore::Iterator::~Iterator() { - } - - void WiredTigerRecordStore::Iterator::_locate(const RecordId &loc, bool exact) { - RS_ITERATOR_TRACE("_locate " << loc); - WT_CURSOR *c = _cursor->get(); - invariant( c ); - int ret; - if (loc.isNull()) { - ret = WT_OP_CHECK(_forward ? c->next(c) : c->prev(c)); - _eof = (ret == WT_NOTFOUND); - if (!_eof) invariantWTOK(ret); - _loc = _curr(); - - RS_ITERATOR_TRACE("_locate null loc eof: " << _eof); - return; - } - - c->set_key(c, _makeKey(loc)); - if (exact) { - ret = WT_OP_CHECK(c->search(c)); - } - else { - // If loc doesn't exist, inexact matches should find the first existing record before - // it, in the direction of the scan. Note that inexact callers will call _getNext() - // after locate so they actually return the record *after* the one we seek to. - int cmp; - ret = WT_OP_CHECK(c->search_near(c, &cmp)); - if ( ret == WT_NOTFOUND ) { - _eof = true; - _loc = RecordId(); - return; - } - invariantWTOK(ret); - if (_forward) { - // return >= loc - if (cmp < 0) - ret = WT_OP_CHECK(c->next(c)); - } - else { - // return <= loc - if (cmp > 0) - ret = WT_OP_CHECK(c->prev(c)); - } - } - if (ret != WT_NOTFOUND) invariantWTOK(ret); - _eof = (ret == WT_NOTFOUND); - _loc = _curr(); - RS_ITERATOR_TRACE("_locate not null loc eof: " << _eof); - } - - bool WiredTigerRecordStore::Iterator::isEOF() { - RS_ITERATOR_TRACE( "isEOF " << _eof << " " << _lastLoc ); - return _eof; - } - - // Allow const functions to use curr to find current location. - RecordId WiredTigerRecordStore::Iterator::_curr() const { - RS_ITERATOR_TRACE( "_curr" ); - if (_eof) - return RecordId(); - - WT_CURSOR *c = _cursor->get(); - dassert( c ); - int64_t key; - int ret = c->get_key(c, &key); - invariantWTOK(ret); - return _fromKey(key); - } - - RecordId WiredTigerRecordStore::Iterator::curr() { - return _loc; - } - - void WiredTigerRecordStore::Iterator::_getNext() { - // Once you go EOF you never go back. - if (_eof) return; - - RS_ITERATOR_TRACE("_getNext"); - WT_CURSOR *c = _cursor->get(); - int ret = WT_OP_CHECK(_forward ? c->next(c) : c->prev(c)); - _eof = (ret == WT_NOTFOUND); - RS_ITERATOR_TRACE("_getNext " << ret << " " << _eof ); - if ( !_eof ) { - RS_ITERATOR_TRACE("_getNext " << ret << " " << _eof << " " << _curr() ); - invariantWTOK(ret); - _loc = _curr(); - RS_ITERATOR_TRACE("_getNext " << ret << " " << _eof << " " << _loc ); - if ( _rs._isCapped ) { - RecordId loc = _curr(); - if ( _readUntilForOplog.isNull() ) { - // this is the normal capped case - if ( _rs.isCappedHidden( loc ) ) { - _eof = true; - } - } - else { - // this is for oplogs - if ( loc > _readUntilForOplog ) { - _eof = true; - } - else if ( loc == _readUntilForOplog && _rs.isCappedHidden( loc ) ) { - // we allow if its been commited already - _eof = true; - } - } - } - } - - if (_eof) { - _loc = RecordId(); - } - } - - RecordId WiredTigerRecordStore::Iterator::getNext() { - RS_ITERATOR_TRACE( "getNext" ); - const RecordId toReturn = _loc; - RS_ITERATOR_TRACE( "getNext toReturn: " << toReturn ); - _getNext(); - RS_ITERATOR_TRACE( " ----" ); - _lastLoc = toReturn; - return toReturn; - } - - void WiredTigerRecordStore::Iterator::invalidate( const RecordId& dl ) { - // this should never be called - } - - void WiredTigerRecordStore::Iterator::saveState() { - RS_ITERATOR_TRACE("saveState"); - - // It must be safe to call saveState() twice in a row without calling restoreState(). - if (!_txn) return; - - // the cursor and recoveryUnit are valid on restore - // so we just record the recoveryUnit to make sure - _savedRecoveryUnit = _txn->recoveryUnit(); - if ( _cursor && !wt_keeptxnopen() ) { - try { - _cursor->reset(); - } - catch (const WriteConflictException& wce) { - // Ignore since this is only called when we are about to kill our transaction - // anyway. - } - } - - if ( _forParallelCollectionScan ) { - _cursor.reset( NULL ); - } - _txn = NULL; - } - - bool WiredTigerRecordStore::Iterator::restoreState( OperationContext *txn ) { - - // This is normally already the case, but sometimes we are given a new - // OperationContext on restore - update the iterators context in that - // case - _txn = txn; - - // If we've hit EOF, then this iterator is done and need not be restored. - if ( _eof ) { - return true; - } - - bool needRestore = false; - - if ( _forParallelCollectionScan ) { - // parallel collection scan or something - needRestore = true; - _savedRecoveryUnit = txn->recoveryUnit(); - _cursor.reset( new WiredTigerCursor( _rs.getURI(), _rs.instanceId(), true, txn ) ); - _forParallelCollectionScan = false; // we only do this the first time - } - - invariant( _savedRecoveryUnit == txn->recoveryUnit() ); - if ( needRestore || !wt_keeptxnopen() ) { - // This will ensure an active session exists, so any restored cursors will bind to it - invariant(WiredTigerRecoveryUnit::get(txn)->getSession(txn) == _cursor->getSession()); - - RecordId saved = _lastLoc; - _locate(_lastLoc, false); - RS_ITERATOR_TRACE( "isEOF check " << _eof ); - if ( _eof ) { - _lastLoc = RecordId(); - } - else if ( _loc != saved ) { - if (_rs._isCapped && _lastLoc != RecordId()) { - // Doc was deleted either by cappedDeleteAsNeeded() or cappedTruncateAfter(). - // It is important that we error out in this case so that consumers don't - // silently get 'holes' when scanning capped collections. We don't make - // this guarantee for normal collections so it is ok to skip ahead in that case. - _eof = true; - return false; - } - // lastLoc was either deleted or never set (yielded before first call to getNext()), - // so bump ahead to the next record. - } - else { - // we found where we left off! - // now we advance to the next one - RS_ITERATOR_TRACE( "isEOF found " << _curr() ); - _getNext(); - } - } - - return true; - } - - RecordData WiredTigerRecordStore::Iterator::dataFor( const RecordId& loc ) const { - // Retrieve the data if the iterator is already positioned at loc, otherwise - // open a new cursor and find the data to avoid upsetting the iterators - // cursor position. - if (loc == _loc) { - dassert(loc == _curr()); - return _rs._getData(*_cursor); - } - else { - return _rs.dataFor( _txn, loc ); - } - } - void WiredTigerRecordStore::temp_cappedTruncateAfter( OperationContext* txn, RecordId end, bool inclusive ) { WriteUnitOfWork wuow(txn); - boost::scoped_ptr<RecordIterator> iter( getIterator( txn, end ) ); - while( !iter->isEOF() ) { - RecordId loc = iter->getNext(); + Cursor cursor(txn, *this); + while (auto record = cursor.next()) { + RecordId loc = record->id; if ( end < loc || ( inclusive && end == loc ) ) { deleteRecord( txn, loc ); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index 5447425ca9b..e613868bce2 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -146,12 +146,9 @@ namespace mongo { const char* damageSource, const mutablebson::DamageVector& damages ); - virtual RecordIterator* getIterator( OperationContext* txn, - const RecordId& start = RecordId(), - const CollectionScanParams::Direction& dir = - CollectionScanParams::FORWARD ) const; - - virtual std::vector<RecordIterator*> getManyIterators( OperationContext* txn ) const; + std::unique_ptr<RecordCursor> getCursor(OperationContext* txn, bool forward) const final; + std::vector<std::unique_ptr<RecordCursor>> getManyCursors( + OperationContext* txn) const final; virtual Status truncate( OperationContext* txn ); @@ -213,43 +210,9 @@ namespace mongo { const RecordId& justInserted); boost::timed_mutex& cappedDeleterMutex() { return _cappedDeleterMutex; } - private: - class Iterator : public RecordIterator { - public: - Iterator( const WiredTigerRecordStore& rs, - OperationContext* txn, - const RecordId& start, - const CollectionScanParams::Direction& dir, - bool forParallelCollectionScan ); - - virtual ~Iterator(); - - virtual bool isEOF(); - virtual RecordId curr(); - virtual RecordId getNext(); - virtual void invalidate(const RecordId& dl); - virtual void saveState(); - virtual bool restoreState(OperationContext *txn); - virtual RecordData dataFor( const RecordId& loc ) const; - - private: - void _getNext(); - void _locate( const RecordId &loc, bool exact ); - RecordId _curr() const; // const version of public curr method - - const WiredTigerRecordStore& _rs; - OperationContext* _txn; - RecoveryUnit* _savedRecoveryUnit; // only used to sanity check between save/restore - const bool _forward; - bool _forParallelCollectionScan; - boost::scoped_ptr<WiredTigerCursor> _cursor; - bool _eof; - const RecordId _readUntilForOplog; - - RecordId _loc; // Cached key of _cursor. Update any time _cursor is moved. - RecordId _lastLoc; // the last thing returned from getNext() - }; + private: + class Cursor; class CappedInsertChange; class NumRecordsChange; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp index 5239c9d24f8..7bd8728109e 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -657,10 +657,10 @@ namespace { { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - scoped_ptr<RecordIterator> it( rs->getIterator( opCtx.get(), loc1 ) ); - ASSERT( !it->isEOF() ); - ASSERT_EQ( loc1, it->getNext() ); - ASSERT( it->isEOF() ); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekExact(loc1); + ASSERT_EQ( loc1, record->id ); + ASSERT(!cursor->next()); } { @@ -682,10 +682,10 @@ namespace { { // state should be the same scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - scoped_ptr<RecordIterator> it( rs->getIterator( opCtx.get(), loc1 ) ); - ASSERT( !it->isEOF() ); - ASSERT_EQ( loc1, it->getNext() ); - ASSERT( it->isEOF() ); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekExact(loc1); + ASSERT_EQ( loc1, record->id ); + ASSERT(!cursor->next()); } w1->commit(); @@ -693,14 +693,12 @@ namespace { { // now all 3 docs should be visible scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - scoped_ptr<RecordIterator> it( rs->getIterator( opCtx.get(), loc1 ) ); - ASSERT( !it->isEOF() ); - ASSERT_EQ( loc1, it->getNext() ); - ASSERT( !it->isEOF() ); - it->getNext(); - ASSERT( !it->isEOF() ); - it->getNext(); - ASSERT( it->isEOF() ); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekExact(loc1); + ASSERT_EQ( loc1, record->id ); + ASSERT(cursor->next()); + ASSERT(cursor->next()); + ASSERT(!cursor->next()); } } @@ -720,12 +718,9 @@ namespace { // set up our cursor that should rollover scoped_ptr<OperationContext> cursorCtx( harnessHelper->newOperationContext() ); - scoped_ptr<RecordIterator> it; - it.reset( rs->getIterator(cursorCtx.get()) ); - ASSERT_FALSE(it->isEOF()); - it->getNext(); - ASSERT_FALSE(it->isEOF()); - it->saveState(); + auto cursor = rs->getCursor(cursorCtx.get()); + ASSERT(cursor->next()); + cursor->savePositioned(); cursorCtx->recoveryUnit()->abandonSnapshot(); { // insert 100 documents which causes rollover @@ -739,8 +734,8 @@ namespace { } // cursor should now be dead - ASSERT_FALSE(it->restoreState(cursorCtx.get())); - ASSERT_TRUE(it->isEOF()); + ASSERT_FALSE(cursor->restore(cursorCtx.get())); + ASSERT(!cursor->next()); } RecordId _oplogOrderInsertOplog( OperationContext* txn, @@ -781,10 +776,10 @@ namespace { { scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - scoped_ptr<RecordIterator> it( rs->getIterator( opCtx.get(), loc1 ) ); - ASSERT( !it->isEOF() ); - ASSERT_EQ( loc1, it->getNext() ); - ASSERT( it->isEOF() ); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekExact(loc1); + ASSERT_EQ( loc1, record->id ); + ASSERT(!cursor->next()); } { @@ -806,10 +801,10 @@ namespace { { // state should be the same scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - scoped_ptr<RecordIterator> it( rs->getIterator( opCtx.get(), loc1 ) ); - ASSERT( !it->isEOF() ); - ASSERT_EQ( loc1, it->getNext() ); - ASSERT( it->isEOF() ); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekExact(loc1); + ASSERT_EQ( loc1, record->id ); + ASSERT(!cursor->next()); } w1->commit(); @@ -817,14 +812,12 @@ namespace { { // now all 3 docs should be visible scoped_ptr<OperationContext> opCtx( harnessHelper->newOperationContext() ); - scoped_ptr<RecordIterator> it( rs->getIterator( opCtx.get(), loc1 ) ); - ASSERT( !it->isEOF() ); - ASSERT_EQ( loc1, it->getNext() ); - ASSERT( !it->isEOF() ); - it->getNext(); - ASSERT( !it->isEOF() ); - it->getNext(); - ASSERT( it->isEOF() ); + auto cursor = rs->getCursor(opCtx.get()); + auto record = cursor->seekExact(loc1); + ASSERT_EQ( loc1, record->id ); + ASSERT(cursor->next()); + ASSERT(cursor->next()); + ASSERT(!cursor->next()); } } @@ -876,15 +869,15 @@ namespace { } scoped_ptr<OperationContext> cursorCtx( harnessHelper->newOperationContext() ); - scoped_ptr<RecordIterator> it( rs->getIterator(cursorCtx.get()) ); - ASSERT_FALSE(it->isEOF()); + auto cursor = rs->getCursor(cursorCtx.get()); // See that things work if you yield before you first call getNext(). - it->saveState(); + cursor->savePositioned(); cursorCtx->recoveryUnit()->abandonSnapshot(); - ASSERT_TRUE(it->restoreState(cursorCtx.get())); - ASSERT_EQ(loc1, it->getNext()); - ASSERT_TRUE(it->isEOF()); + ASSERT_TRUE(cursor->restore(cursorCtx.get())); + auto record = cursor->next(); + ASSERT_EQ( loc1, record->id ); + ASSERT(!cursor->next()); } } // namespace mongo diff --git a/src/mongo/dbtests/query_stage_and.cpp b/src/mongo/dbtests/query_stage_and.cpp index 2b43feb3b32..29db2eaf104 100644 --- a/src/mongo/dbtests/query_stage_and.cpp +++ b/src/mongo/dbtests/query_stage_and.cpp @@ -82,13 +82,10 @@ namespace QueryStageAnd { } void getLocs(set<RecordId>* out, Collection* coll) { - RecordIterator* it = coll->getIterator(&_txn, RecordId(), - CollectionScanParams::FORWARD); - while (!it->isEOF()) { - RecordId nextLoc = it->getNext(); - out->insert(nextLoc); + auto cursor = coll->getCursor(&_txn); + while (auto record = cursor->next()) { + out->insert(record->id); } - delete it; } void insert(const BSONObj& obj) { diff --git a/src/mongo/dbtests/query_stage_fetch.cpp b/src/mongo/dbtests/query_stage_fetch.cpp index c8617a59acf..e33d9b08420 100644 --- a/src/mongo/dbtests/query_stage_fetch.cpp +++ b/src/mongo/dbtests/query_stage_fetch.cpp @@ -62,12 +62,10 @@ namespace QueryStageFetch { } void getLocs(set<RecordId>* out, Collection* coll) { - RecordIterator* it = coll->getIterator(&_txn); - while (!it->isEOF()) { - RecordId nextLoc = it->getNext(); - out->insert(nextLoc); + auto cursor = coll->getCursor(&_txn); + while (auto record = cursor->next()) { + out->insert(record->id); } - delete it; } void insert(const BSONObj& obj) { diff --git a/src/mongo/dbtests/query_stage_keep.cpp b/src/mongo/dbtests/query_stage_keep.cpp index f78a2977300..743d1f7bdd2 100644 --- a/src/mongo/dbtests/query_stage_keep.cpp +++ b/src/mongo/dbtests/query_stage_keep.cpp @@ -66,12 +66,10 @@ namespace QueryStageKeep { } void getLocs(set<RecordId>* out, Collection* coll) { - RecordIterator* it = coll->getIterator(&_txn); - while (!it->isEOF()) { - RecordId nextLoc = it->getNext(); - out->insert(nextLoc); + auto cursor = coll->getCursor(&_txn); + while (auto record = cursor->next()) { + out->insert(record->id); } - delete it; } void insert(const BSONObj& obj) { @@ -154,7 +152,10 @@ namespace QueryStageKeep { ASSERT_EQUALS(member->obj.value()["x"].numberInt(), 1); } - ASSERT(cs->isEOF()); + { + WorkingSetID out; + ASSERT_EQ(cs->work(&out), PlanStage::IS_EOF); + } // Flagged results *must* be at the end. for (size_t i = 0; i < 10; ++i) { diff --git a/src/mongo/dbtests/query_stage_merge_sort.cpp b/src/mongo/dbtests/query_stage_merge_sort.cpp index 4685c53b084..0c3813e5a37 100644 --- a/src/mongo/dbtests/query_stage_merge_sort.cpp +++ b/src/mongo/dbtests/query_stage_merge_sort.cpp @@ -78,12 +78,10 @@ namespace QueryStageMergeSortTests { } void getLocs(set<RecordId>* out, Collection* coll) { - RecordIterator* it = coll->getIterator(&_txn); - while (!it->isEOF()) { - RecordId nextLoc = it->getNext(); - out->insert(nextLoc); + auto cursor = coll->getCursor(&_txn); + while (auto record = cursor->next()) { + out->insert(record->id); } - delete it; } BSONObj objWithMinKey(int start) { diff --git a/src/mongo/dbtests/query_stage_sort.cpp b/src/mongo/dbtests/query_stage_sort.cpp index e398b696339..dc267b9a8b1 100644 --- a/src/mongo/dbtests/query_stage_sort.cpp +++ b/src/mongo/dbtests/query_stage_sort.cpp @@ -70,12 +70,10 @@ namespace QueryStageSortTests { } void getLocs(set<RecordId>* out, Collection* coll) { - RecordIterator* it = coll->getIterator(&_txn); - while (!it->isEOF()) { - RecordId nextLoc = it->getNext(); - out->insert(nextLoc); + auto cursor = coll->getCursor(&_txn); + while (auto record = cursor->next()) { + out->insert(record->id); } - delete it; } /** diff --git a/src/mongo/dbtests/querytests.cpp b/src/mongo/dbtests/querytests.cpp index d13ad632970..d668cb8e2d4 100644 --- a/src/mongo/dbtests/querytests.cpp +++ b/src/mongo/dbtests/querytests.cpp @@ -1518,10 +1518,25 @@ namespace QueryTests { for( int i = 0; i < 5; ++i ) { insert( ns(), BSONObj() ); } - auto_ptr<DBClientCursor> c = _client.query( ns(), Query(), 5 ); - ASSERT( c->more() ); - // With five results and a batch size of 5, no cursor is created. - ASSERT_EQUALS( 0, c->getCursorId() ); + { + // With five results and a batch size of 5, a cursor is created since we don't know + // there are no more results. + std::auto_ptr<DBClientCursor> c = _client.query( ns(), Query(), 5 ); + ASSERT(c->more()); + ASSERT_NE(0, c->getCursorId()); + for (int i = 0; i < 5; ++i) { + ASSERT(c->more()); + c->next(); + } + ASSERT(!c->more()); + } + { + // With a batchsize of 6 we know there are no more results so we don't create a + // cursor. + std::auto_ptr<DBClientCursor> c = _client.query( ns(), Query(), 6 ); + ASSERT(c->more()); + ASSERT_EQ(0, c->getCursorId()); + } } }; diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index d79495cdd34..74c250fc098 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -149,11 +149,10 @@ namespace ReplTests { } int count = 0; - RecordIterator* it = coll->getIterator(&_txn); - for ( ; !it->isEOF(); it->getNext() ) { + auto cursor = coll->getCursor(&_txn); + while (auto record = cursor->next()) { ++count; } - delete it; return count; } int opCount() { @@ -170,11 +169,10 @@ namespace ReplTests { } int count = 0; - RecordIterator* it = coll->getIterator(&_txn); - for ( ; !it->isEOF(); it->getNext() ) { + auto cursor = coll->getCursor(&_txn); + while (auto record = cursor->next()) { ++count; } - delete it; return count; } void applyAllOperations() { @@ -186,12 +184,10 @@ namespace ReplTests { Database* db = ctx.db(); Collection* coll = db->getCollection( cllNS() ); - RecordIterator* it = coll->getIterator(&_txn); - while ( !it->isEOF() ) { - RecordId currLoc = it->getNext(); - ops.push_back(coll->docFor(&_txn, currLoc).value()); + auto cursor = coll->getCursor(&_txn); + while (auto record = cursor->next()) { + ops.push_back(record->data.releaseToBson().getOwned()); } - delete it; } { OldClientContext ctx(&_txn, ns() ); @@ -222,13 +218,11 @@ namespace ReplTests { wunit.commit(); } - RecordIterator* it = coll->getIterator(&_txn); + auto cursor = coll->getCursor(&_txn); ::mongo::log() << "all for " << ns << endl; - while ( !it->isEOF() ) { - RecordId currLoc = it->getNext(); - ::mongo::log() << coll->docFor(&_txn, currLoc).value().toString() << endl; + while (auto record = cursor->next()) { + ::mongo::log() << record->data.releaseToBson() << endl; } - delete it; } // These deletes don't get logged. void deleteAll( const char *ns ) const { @@ -243,11 +237,13 @@ namespace ReplTests { } vector< RecordId > toDelete; - RecordIterator* it = coll->getIterator(&_txn); - while ( !it->isEOF() ) { - toDelete.push_back( it->getNext() ); + { + auto cursor = coll->getCursor(&_txn); + while (auto record = cursor->next()) { + toDelete.push_back(record->id); + } } - delete it; + for( vector< RecordId >::iterator i = toDelete.begin(); i != toDelete.end(); ++i ) { _txn.setReplicatedWrites(false); coll->deleteDocument( &_txn, *i, true ); diff --git a/src/mongo/dbtests/rollbacktests.cpp b/src/mongo/dbtests/rollbacktests.cpp index f742fb68b00..74078abf1b1 100644 --- a/src/mongo/dbtests/rollbacktests.cpp +++ b/src/mongo/dbtests/rollbacktests.cpp @@ -99,16 +99,17 @@ namespace { const NamespaceString& nss, const BSONObj& data ) { Collection* coll = dbHolder().get( txn, nss.db() )->getCollection(nss.ns() ); - scoped_ptr<RecordIterator> iter( coll->getIterator( txn ) ); - ASSERT( !iter->isEOF() ); - RecordId loc = iter->getNext(); - ASSERT( iter->isEOF() ); - ASSERT_EQ( data, coll->docFor( txn, loc ).value() ); + auto cursor = coll->getCursor(txn); + + auto record = cursor->next(); + ASSERT(record); + ASSERT_EQ(data, record->data.releaseToBson()); + + ASSERT(!cursor->next()); } void assertEmpty( OperationContext* txn, const NamespaceString& nss ) { Collection* coll = dbHolder().get( txn, nss.db() )->getCollection(nss.ns() ); - scoped_ptr<RecordIterator> iter( coll->getIterator( txn ) ); - ASSERT( iter->isEOF() ); + ASSERT(!coll->getCursor(txn)->next()); } bool indexExists( OperationContext* txn, const NamespaceString& nss, const string& idxName ) { Collection* coll = dbHolder().get( txn, nss.db() )->getCollection(nss.ns() ); |