diff options
Diffstat (limited to 'src/mongo/db/exec/near.cpp')
-rw-r--r-- | src/mongo/db/exec/near.cpp | 556 |
1 files changed, 265 insertions, 291 deletions
diff --git a/src/mongo/db/exec/near.cpp b/src/mongo/db/exec/near.cpp index f7c1c9035d3..515120d86a6 100644 --- a/src/mongo/db/exec/near.cpp +++ b/src/mongo/db/exec/near.cpp @@ -36,360 +36,334 @@ namespace mongo { - using std::vector; - - NearStage::NearStage(OperationContext* txn, - WorkingSet* workingSet, - Collection* collection, - PlanStageStats* stats) - : _txn(txn), - _workingSet(workingSet), - _collection(collection), - _searchState(SearchState_Initializing), - _stats(stats), - _nextInterval(NULL) { - - // Ensure we have specific distance search stats unless a child class specified their - // own distance stats subclass - if (!_stats->specific) { - _stats->specific.reset(new NearStats); - } +using std::vector; + +NearStage::NearStage(OperationContext* txn, + WorkingSet* workingSet, + Collection* collection, + PlanStageStats* stats) + : _txn(txn), + _workingSet(workingSet), + _collection(collection), + _searchState(SearchState_Initializing), + _stats(stats), + _nextInterval(NULL) { + // Ensure we have specific distance search stats unless a child class specified their + // own distance stats subclass + if (!_stats->specific) { + _stats->specific.reset(new NearStats); } - - NearStage::~NearStage() { +} + +NearStage::~NearStage() {} + +NearStage::CoveredInterval::CoveredInterval(PlanStage* covering, + bool dedupCovering, + double minDistance, + double maxDistance, + bool inclusiveMax) + : covering(covering), + dedupCovering(dedupCovering), + minDistance(minDistance), + maxDistance(maxDistance), + inclusiveMax(inclusiveMax) {} + + +PlanStage::StageState NearStage::initNext(WorkingSetID* out) { + PlanStage::StageState state = initialize(_txn, _workingSet, _collection, out); + if (state == PlanStage::IS_EOF) { + _searchState = SearchState_Buffering; + return PlanStage::NEED_TIME; } - NearStage::CoveredInterval::CoveredInterval(PlanStage* covering, - bool dedupCovering, - double minDistance, - double maxDistance, - bool inclusiveMax) : - covering(covering), - dedupCovering(dedupCovering), - minDistance(minDistance), - maxDistance(maxDistance), - inclusiveMax(inclusiveMax) { - } + invariant(state != PlanStage::ADVANCED); + // Propagate NEED_TIME or errors upward. + return state; +} - PlanStage::StageState NearStage::initNext(WorkingSetID* out) { - PlanStage::StageState state = initialize(_txn, _workingSet, _collection, out); - if (state == PlanStage::IS_EOF) { - _searchState = SearchState_Buffering; - return PlanStage::NEED_TIME; - } +PlanStage::StageState NearStage::work(WorkingSetID* out) { + ++_stats->common.works; - invariant(state != PlanStage::ADVANCED); + // Adds the amount of time taken by work() to executionTimeMillis. + ScopedTimer timer(&_stats->common.executionTimeMillis); - // Propagate NEED_TIME or errors upward. - return state; - } + WorkingSetID toReturn = WorkingSet::INVALID_ID; + Status error = Status::OK(); + PlanStage::StageState nextState = PlanStage::NEED_TIME; - PlanStage::StageState NearStage::work(WorkingSetID* out) { + // + // Work the search + // - ++_stats->common.works; + if (SearchState_Initializing == _searchState) { + nextState = initNext(&toReturn); + } else if (SearchState_Buffering == _searchState) { + nextState = bufferNext(&toReturn, &error); + } else if (SearchState_Advancing == _searchState) { + nextState = advanceNext(&toReturn); + } else { + invariant(SearchState_Finished == _searchState); + nextState = PlanStage::IS_EOF; + } - // Adds the amount of time taken by work() to executionTimeMillis. - ScopedTimer timer(&_stats->common.executionTimeMillis); + // + // Handle the results + // + + if (PlanStage::FAILURE == nextState) { + *out = WorkingSetCommon::allocateStatusMember(_workingSet, error); + } else if (PlanStage::ADVANCED == nextState) { + *out = toReturn; + ++_stats->common.advanced; + } else if (PlanStage::NEED_YIELD == nextState) { + *out = toReturn; + ++_stats->common.needYield; + } else if (PlanStage::NEED_TIME == nextState) { + ++_stats->common.needTime; + } else if (PlanStage::IS_EOF == nextState) { + _stats->common.isEOF = true; + } - WorkingSetID toReturn = WorkingSet::INVALID_ID; - Status error = Status::OK(); - PlanStage::StageState nextState = PlanStage::NEED_TIME; + return nextState; +} - // - // Work the search - // +/** + * Holds a generic search result with a distance computed in some fashion. + */ +struct NearStage::SearchResult { + SearchResult(WorkingSetID resultID, double distance) : resultID(resultID), distance(distance) {} - if (SearchState_Initializing == _searchState) { - nextState = initNext(&toReturn); - } - else if (SearchState_Buffering == _searchState) { - nextState = bufferNext(&toReturn, &error); - } - else if (SearchState_Advancing == _searchState) { - nextState = advanceNext(&toReturn); - } - else { - invariant(SearchState_Finished == _searchState); - nextState = PlanStage::IS_EOF; - } + bool operator<(const SearchResult& other) const { + // We want increasing distance, not decreasing, so we reverse the < + return distance > other.distance; + } - // - // Handle the results - // + WorkingSetID resultID; + double distance; +}; - if (PlanStage::FAILURE == nextState) { - *out = WorkingSetCommon::allocateStatusMember(_workingSet, error); - } - else if (PlanStage::ADVANCED == nextState) { - *out = toReturn; - ++_stats->common.advanced; - } - else if (PlanStage::NEED_YIELD == nextState) { - *out = toReturn; - ++_stats->common.needYield; - } - else if (PlanStage::NEED_TIME == nextState) { - ++_stats->common.needTime; +// Set "toReturn" when NEED_YIELD. +PlanStage::StageState NearStage::bufferNext(WorkingSetID* toReturn, Status* error) { + // + // Try to retrieve the next covered member + // + + if (!_nextInterval) { + StatusWith<CoveredInterval*> intervalStatus = nextInterval(_txn, _workingSet, _collection); + if (!intervalStatus.isOK()) { + _searchState = SearchState_Finished; + *error = intervalStatus.getStatus(); + return PlanStage::FAILURE; } - else if (PlanStage::IS_EOF == nextState) { - _stats->common.isEOF = true; + + if (NULL == intervalStatus.getValue()) { + _searchState = SearchState_Finished; + return PlanStage::IS_EOF; } - return nextState; + // CoveredInterval and its child stage are owned by _childrenIntervals + _childrenIntervals.push_back(intervalStatus.getValue()); + _nextInterval = _childrenIntervals.back(); + _nextIntervalStats.reset(new IntervalStats()); + _nextIntervalStats->minDistanceAllowed = _nextInterval->minDistance; + _nextIntervalStats->maxDistanceAllowed = _nextInterval->maxDistance; + _nextIntervalStats->inclusiveMaxDistanceAllowed = _nextInterval->inclusiveMax; } - /** - * Holds a generic search result with a distance computed in some fashion. - */ - struct NearStage::SearchResult { - - SearchResult(WorkingSetID resultID, double distance) : - resultID(resultID), distance(distance) { - } + WorkingSetID nextMemberID; + PlanStage::StageState intervalState = _nextInterval->covering->work(&nextMemberID); - bool operator<(const SearchResult& other) const { - // We want increasing distance, not decreasing, so we reverse the < - return distance > other.distance; - } + if (PlanStage::IS_EOF == intervalState) { + getNearStats()->intervalStats.push_back(*_nextIntervalStats); + _nextIntervalStats.reset(); + _nextInterval = NULL; + _searchState = SearchState_Advancing; + return PlanStage::NEED_TIME; + } else if (PlanStage::FAILURE == intervalState) { + *error = WorkingSetCommon::getMemberStatus(*_workingSet->get(nextMemberID)); + return intervalState; + } else if (PlanStage::NEED_YIELD == intervalState) { + *toReturn = nextMemberID; + return intervalState; + } else if (PlanStage::ADVANCED != intervalState) { + return intervalState; + } - WorkingSetID resultID; - double distance; - }; - - // Set "toReturn" when NEED_YIELD. - PlanStage::StageState NearStage::bufferNext(WorkingSetID* toReturn, Status* error) { - - // - // Try to retrieve the next covered member - // - - if (!_nextInterval) { - - StatusWith<CoveredInterval*> intervalStatus = nextInterval(_txn, - _workingSet, - _collection); - if (!intervalStatus.isOK()) { - _searchState = SearchState_Finished; - *error = intervalStatus.getStatus(); - return PlanStage::FAILURE; - } - - if (NULL == intervalStatus.getValue()) { - _searchState = SearchState_Finished; - return PlanStage::IS_EOF; - } - - // CoveredInterval and its child stage are owned by _childrenIntervals - _childrenIntervals.push_back(intervalStatus.getValue()); - _nextInterval = _childrenIntervals.back(); - _nextIntervalStats.reset(new IntervalStats()); - _nextIntervalStats->minDistanceAllowed = _nextInterval->minDistance; - _nextIntervalStats->maxDistanceAllowed = _nextInterval->maxDistance; - _nextIntervalStats->inclusiveMaxDistanceAllowed = _nextInterval->inclusiveMax; - } + // + // Try to buffer the next covered member + // - WorkingSetID nextMemberID; - PlanStage::StageState intervalState = _nextInterval->covering->work(&nextMemberID); + WorkingSetMember* nextMember = _workingSet->get(nextMemberID); - if (PlanStage::IS_EOF == intervalState) { - getNearStats()->intervalStats.push_back(*_nextIntervalStats); - _nextIntervalStats.reset(); - _nextInterval = NULL; - _searchState = SearchState_Advancing; + // The child stage may not dedup so we must dedup them ourselves. + if (_nextInterval->dedupCovering && nextMember->hasLoc()) { + if (_nextIntervalSeen.end() != _nextIntervalSeen.find(nextMember->loc)) { + _workingSet->free(nextMemberID); return PlanStage::NEED_TIME; } - else if (PlanStage::FAILURE == intervalState) { - *error = WorkingSetCommon::getMemberStatus(*_workingSet->get(nextMemberID)); - return intervalState; - } - else if (PlanStage::NEED_YIELD == intervalState) { - *toReturn = nextMemberID; - return intervalState; - } - else if (PlanStage::ADVANCED != intervalState) { - return intervalState; - } - - // - // Try to buffer the next covered member - // + } - WorkingSetMember* nextMember = _workingSet->get(nextMemberID); + ++_nextIntervalStats->numResultsFound; - // The child stage may not dedup so we must dedup them ourselves. - if (_nextInterval->dedupCovering && nextMember->hasLoc()) { - if (_nextIntervalSeen.end() != _nextIntervalSeen.find(nextMember->loc)) { - _workingSet->free(nextMemberID); - return PlanStage::NEED_TIME; - } - } + StatusWith<double> distanceStatus = computeDistance(nextMember); - ++_nextIntervalStats->numResultsFound; + if (!distanceStatus.isOK()) { + _searchState = SearchState_Finished; + *error = distanceStatus.getStatus(); + return PlanStage::FAILURE; + } - StatusWith<double> distanceStatus = computeDistance(nextMember); + // If the member's distance is in the current distance interval, add it to our buffered + // results. + double memberDistance = distanceStatus.getValue(); + bool inInterval = memberDistance >= _nextInterval->minDistance && + (_nextInterval->inclusiveMax ? memberDistance <= _nextInterval->maxDistance + : memberDistance < _nextInterval->maxDistance); + + // Update found distance stats + if (_nextIntervalStats->minDistanceFound < 0 || + memberDistance < _nextIntervalStats->minDistanceFound) { + _nextIntervalStats->minDistanceFound = memberDistance; + } - if (!distanceStatus.isOK()) { - _searchState = SearchState_Finished; - *error = distanceStatus.getStatus(); - return PlanStage::FAILURE; - } + if (_nextIntervalStats->maxDistanceFound < 0 || + memberDistance > _nextIntervalStats->maxDistanceFound) { + _nextIntervalStats->maxDistanceFound = memberDistance; + } - // If the member's distance is in the current distance interval, add it to our buffered - // results. - double memberDistance = distanceStatus.getValue(); - bool inInterval = memberDistance >= _nextInterval->minDistance - && (_nextInterval->inclusiveMax ? - memberDistance <= _nextInterval->maxDistance : - memberDistance < _nextInterval->maxDistance); - - // Update found distance stats - if (_nextIntervalStats->minDistanceFound < 0 - || memberDistance < _nextIntervalStats->minDistanceFound) { - _nextIntervalStats->minDistanceFound = memberDistance; - } + if (inInterval) { + _resultBuffer.push(SearchResult(nextMemberID, memberDistance)); - if (_nextIntervalStats->maxDistanceFound < 0 - || memberDistance > _nextIntervalStats->maxDistanceFound) { - _nextIntervalStats->maxDistanceFound = memberDistance; + // Store the member's RecordId, if available, for quick invalidation + if (nextMember->hasLoc()) { + _nextIntervalSeen.insert(std::make_pair(nextMember->loc, nextMemberID)); } - if (inInterval) { - _resultBuffer.push(SearchResult(nextMemberID, memberDistance)); - - // Store the member's RecordId, if available, for quick invalidation - if (nextMember->hasLoc()) { - _nextIntervalSeen.insert(std::make_pair(nextMember->loc, nextMemberID)); - } + ++_nextIntervalStats->numResultsBuffered; - ++_nextIntervalStats->numResultsBuffered; - - // Update buffered distance stats - if (_nextIntervalStats->minDistanceBuffered < 0 - || memberDistance < _nextIntervalStats->minDistanceBuffered) { - _nextIntervalStats->minDistanceBuffered = memberDistance; - } - - if (_nextIntervalStats->maxDistanceBuffered < 0 - || memberDistance > _nextIntervalStats->maxDistanceBuffered) { - _nextIntervalStats->maxDistanceBuffered = memberDistance; - } - } - else { - _workingSet->free(nextMemberID); + // Update buffered distance stats + if (_nextIntervalStats->minDistanceBuffered < 0 || + memberDistance < _nextIntervalStats->minDistanceBuffered) { + _nextIntervalStats->minDistanceBuffered = memberDistance; } - return PlanStage::NEED_TIME; + if (_nextIntervalStats->maxDistanceBuffered < 0 || + memberDistance > _nextIntervalStats->maxDistanceBuffered) { + _nextIntervalStats->maxDistanceBuffered = memberDistance; + } + } else { + _workingSet->free(nextMemberID); } - PlanStage::StageState NearStage::advanceNext(WorkingSetID* toReturn) { - - if (_resultBuffer.empty()) { - - // We're done returning the documents buffered for this annulus, so we can - // clear out our buffered RecordIds. - _nextIntervalSeen.clear(); - _searchState = SearchState_Buffering; - return PlanStage::NEED_TIME; - } + return PlanStage::NEED_TIME; +} - *toReturn = _resultBuffer.top().resultID; - _resultBuffer.pop(); +PlanStage::StageState NearStage::advanceNext(WorkingSetID* toReturn) { + if (_resultBuffer.empty()) { + // We're done returning the documents buffered for this annulus, so we can + // clear out our buffered RecordIds. + _nextIntervalSeen.clear(); + _searchState = SearchState_Buffering; + return PlanStage::NEED_TIME; + } - // If we're returning something, take it out of our RecordId -> WSID map so that future - // calls to invalidate don't cause us to take action for a RecordId we're done with. - WorkingSetMember* member = _workingSet->get(*toReturn); - if (member->hasLoc()) { - _nextIntervalSeen.erase(member->loc); - } + *toReturn = _resultBuffer.top().resultID; + _resultBuffer.pop(); - return PlanStage::ADVANCED; + // If we're returning something, take it out of our RecordId -> WSID map so that future + // calls to invalidate don't cause us to take action for a RecordId we're done with. + WorkingSetMember* member = _workingSet->get(*toReturn); + if (member->hasLoc()) { + _nextIntervalSeen.erase(member->loc); } - bool NearStage::isEOF() { - return SearchState_Finished == _searchState; - } + return PlanStage::ADVANCED; +} - void NearStage::saveState() { - _txn = NULL; - ++_stats->common.yields; - for (size_t i = 0; i < _childrenIntervals.size(); i++) { - _childrenIntervals[i]->covering->saveState(); - } +bool NearStage::isEOF() { + return SearchState_Finished == _searchState; +} - // Subclass specific saving, e.g. saving the 2d or 2dsphere density estimator. - finishSaveState(); +void NearStage::saveState() { + _txn = NULL; + ++_stats->common.yields; + for (size_t i = 0; i < _childrenIntervals.size(); i++) { + _childrenIntervals[i]->covering->saveState(); } - void NearStage::restoreState(OperationContext* opCtx) { - invariant(_txn == NULL); - _txn = opCtx; - ++_stats->common.unyields; - for (size_t i = 0; i < _childrenIntervals.size(); i++) { - _childrenIntervals[i]->covering->restoreState(opCtx); - } + // Subclass specific saving, e.g. saving the 2d or 2dsphere density estimator. + finishSaveState(); +} - // Subclass specific restoring, e.g. restoring the 2d or 2dsphere density estimator. - finishRestoreState(opCtx); +void NearStage::restoreState(OperationContext* opCtx) { + invariant(_txn == NULL); + _txn = opCtx; + ++_stats->common.unyields; + for (size_t i = 0; i < _childrenIntervals.size(); i++) { + _childrenIntervals[i]->covering->restoreState(opCtx); } - void NearStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { - ++_stats->common.invalidates; - for (size_t i = 0; i < _childrenIntervals.size(); i++) { - _childrenIntervals[i]->covering->invalidate(txn, dl, type); - } - - // If a result is in _resultBuffer and has a RecordId it will be in _nextIntervalSeen as - // well. It's safe to return the result w/o the RecordId, so just fetch the result. - unordered_map<RecordId, WorkingSetID, RecordId::Hasher>::iterator seenIt = _nextIntervalSeen - .find(dl); + // Subclass specific restoring, e.g. restoring the 2d or 2dsphere density estimator. + finishRestoreState(opCtx); +} - if (seenIt != _nextIntervalSeen.end()) { +void NearStage::invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) { + ++_stats->common.invalidates; + for (size_t i = 0; i < _childrenIntervals.size(); i++) { + _childrenIntervals[i]->covering->invalidate(txn, dl, type); + } - WorkingSetMember* member = _workingSet->get(seenIt->second); - verify(member->hasLoc()); - WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection); - verify(!member->hasLoc()); + // If a result is in _resultBuffer and has a RecordId it will be in _nextIntervalSeen as + // well. It's safe to return the result w/o the RecordId, so just fetch the result. + unordered_map<RecordId, WorkingSetID, RecordId::Hasher>::iterator seenIt = + _nextIntervalSeen.find(dl); - // Don't keep it around in the seen map since there's no valid RecordId anymore - _nextIntervalSeen.erase(seenIt); - } + if (seenIt != _nextIntervalSeen.end()) { + WorkingSetMember* member = _workingSet->get(seenIt->second); + verify(member->hasLoc()); + WorkingSetCommon::fetchAndInvalidateLoc(txn, member, _collection); + verify(!member->hasLoc()); - // Subclass specific invalidation, e.g. passing the invalidation to the 2d or 2dsphere - // density estimator. - finishInvalidate(txn, dl, type); + // Don't keep it around in the seen map since there's no valid RecordId anymore + _nextIntervalSeen.erase(seenIt); } - vector<PlanStage*> NearStage::getChildren() const { - vector<PlanStage*> children; - for (size_t i = 0; i < _childrenIntervals.size(); i++) { - children.push_back(_childrenIntervals[i]->covering.get()); - } - return children; - } + // Subclass specific invalidation, e.g. passing the invalidation to the 2d or 2dsphere + // density estimator. + finishInvalidate(txn, dl, type); +} - PlanStageStats* NearStage::getStats() { - PlanStageStats* statsClone = _stats->clone(); - for (size_t i = 0; i < _childrenIntervals.size(); ++i) { - statsClone->children.push_back(_childrenIntervals[i]->covering->getStats()); - } - return statsClone; +vector<PlanStage*> NearStage::getChildren() const { + vector<PlanStage*> children; + for (size_t i = 0; i < _childrenIntervals.size(); i++) { + children.push_back(_childrenIntervals[i]->covering.get()); } + return children; +} - StageType NearStage::stageType() const { - return _stats->stageType; +PlanStageStats* NearStage::getStats() { + PlanStageStats* statsClone = _stats->clone(); + for (size_t i = 0; i < _childrenIntervals.size(); ++i) { + statsClone->children.push_back(_childrenIntervals[i]->covering->getStats()); } + return statsClone; +} - const CommonStats* NearStage::getCommonStats() const { - return &_stats->common; - } +StageType NearStage::stageType() const { + return _stats->stageType; +} - const SpecificStats* NearStage::getSpecificStats() const { - return _stats->specific.get(); - } +const CommonStats* NearStage::getCommonStats() const { + return &_stats->common; +} - NearStats* NearStage::getNearStats() { - return static_cast<NearStats*>(_stats->specific.get()); - } +const SpecificStats* NearStage::getSpecificStats() const { + return _stats->specific.get(); +} + +NearStats* NearStage::getNearStats() { + return static_cast<NearStats*>(_stats->specific.get()); +} -} // namespace mongo +} // namespace mongo |