diff options
author | Benety Goh <benety@mongodb.com> | 2014-02-25 21:23:31 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2014-02-27 19:09:23 -0500 |
commit | 5e120d33c761d2dd89ae260751ab1368c512a8d5 (patch) | |
tree | 00928fc54037c9cfd717eb69936eb29fddde5edf /src/mongo | |
parent | 04f7ab3acc4e10edac4e34a7a8a4bc47b1934132 (diff) | |
download | mongo-5e120d33c761d2dd89ae260751ab1368c512a8d5.tar.gz |
SERVER-12868 added memory usage accounting to hashed AND stage
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/exec/and_hash.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/exec/and_hash.h | 21 | ||||
-rw-r--r-- | src/mongo/dbtests/query_stage_and.cpp | 244 |
3 files changed, 332 insertions, 6 deletions
diff --git a/src/mongo/db/exec/and_hash.cpp b/src/mongo/db/exec/and_hash.cpp index 8f60859b58f..d30a2e0123e 100644 --- a/src/mongo/db/exec/and_hash.cpp +++ b/src/mongo/db/exec/and_hash.cpp @@ -34,15 +34,49 @@ #include "mongo/db/exec/working_set.h" #include "mongo/util/mongoutils/str.h" +namespace { + + using namespace mongo; + + // Upper limit for buffered data. + // Stage execution will fail once size of all buffered data exceeds this threshold. + const size_t kDefaultMaxMemUsageBytes = 32 * 1024 * 1024; + + /** + * Returns expected memory usage of working set member + */ + size_t getMemberMemUsage(WorkingSetMember* member) { + size_t memUsage = 0; + for (size_t i = 0; i < member->keyData.size(); ++i) { + const IndexKeyDatum& keyDatum = member->keyData[i]; + memUsage += keyDatum.keyData.objsize(); + } + return memUsage; + } + +} // namespace + namespace mongo { + using std::auto_ptr; + const size_t AndHashStage::kLookAheadWorks = 10; AndHashStage::AndHashStage(WorkingSet* ws, const MatchExpression* filter) : _ws(ws), _filter(filter), _hashingChildren(true), - _currentChild(0) {} + _currentChild(0), + _memUsage(0), + _maxMemUsage(kDefaultMaxMemUsageBytes) {} + + AndHashStage::AndHashStage(WorkingSet* ws, const MatchExpression* filter, size_t maxMemUsage) + : _ws(ws), + _filter(filter), + _hashingChildren(true), + _currentChild(0), + _memUsage(0), + _maxMemUsage(maxMemUsage) {} AndHashStage::~AndHashStage() { for (size_t i = 0; i < _children.size(); ++i) { delete _children[i]; } @@ -50,6 +84,10 @@ namespace mongo { void AndHashStage::addChild(PlanStage* child) { _children.push_back(child); } + size_t AndHashStage::getMemUsage() const { + return _memUsage; + } + bool AndHashStage::isEOF() { // This is empty before calling work() and not-empty after. if (_lookAheadResults.empty()) { return false; } @@ -139,6 +177,16 @@ namespace mongo { // We read the first child into our hash table. if (_hashingChildren) { + // Check memory usage of previously hashed results. + if (_memUsage > _maxMemUsage) { + mongoutils::str::stream ss; + ss << "hashed AND stage buffered data usage of " << _memUsage + << " bytes exceeds internal limit of " << kDefaultMaxMemUsageBytes << " bytes"; + Status status(ErrorCodes::Overflow, ss); + *out = WorkingSetCommon::allocateStatusMember( _ws, status); + return PlanStage::FAILURE; + } + if (0 == _currentChild) { return readFirstChild(out); } @@ -241,6 +289,10 @@ namespace mongo { verify(_dataMap.end() == _dataMap.find(member->loc)); _dataMap[member->loc] = id; + + // Update memory stats. + _memUsage += getMemberMemUsage(member); + ++_commonStats.needTime; return PlanStage::NEED_TIME; } @@ -309,7 +361,12 @@ namespace mongo { // We have a hit. Copy data into the WSM we already have. _seenMap.insert(member->loc); WorkingSetMember* olderMember = _ws->get(_dataMap[member->loc]); + size_t memUsageBefore = getMemberMemUsage(olderMember); + AndCommon::mergeFrom(olderMember, *member); + + // Update memory stats. + _memUsage += getMemberMemUsage(olderMember) - memUsageBefore; } _ws->free(id); ++_commonStats.needTime; @@ -325,6 +382,11 @@ namespace mongo { if (_seenMap.end() == _seenMap.find(it->first)) { DataMap::iterator toErase = it; ++it; + + // Update memory stats. + WorkingSetMember* member = _ws->get(toErase->second); + _memUsage -= getMemberMemUsage(member); + _ws->free(toErase->second); _dataMap.erase(toErase); } @@ -435,6 +497,9 @@ namespace mongo { ++_specificStats.flaggedButPassed; } + // Update memory stats. + _memUsage -= getMemberMemUsage(member); + // The loc is about to be invalidated. Fetch it and clear the loc. WorkingSetCommon::fetchAndInvalidateLoc(member); @@ -449,10 +514,8 @@ namespace mongo { PlanStageStats* AndHashStage::getStats() { _commonStats.isEOF = isEOF(); - // XXX: populate - // _specificStats.memLimit - // _specificStats.memUsage - // when ben's change is in + _specificStats.memLimit = _maxMemUsage; + _specificStats.memUsage = _memUsage; auto_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_AND_HASH)); ret->specific.reset(new AndHashStats(_specificStats)); diff --git a/src/mongo/db/exec/and_hash.h b/src/mongo/db/exec/and_hash.h index 6d14bf16c79..a546057ffe2 100644 --- a/src/mongo/db/exec/and_hash.h +++ b/src/mongo/db/exec/and_hash.h @@ -53,10 +53,22 @@ namespace mongo { class AndHashStage : public PlanStage { public: AndHashStage(WorkingSet* ws, const MatchExpression* filter); + + /** + * For testing only. Allows tests to set memory usage threshold. + */ + AndHashStage(WorkingSet* ws, const MatchExpression* filter, size_t maxMemUsage); + virtual ~AndHashStage(); void addChild(PlanStage* child); + /** + * Returns memory usage. + * For testing only. + */ + size_t getMemUsage() const; + virtual StageState work(WorkingSetID* out); virtual bool isEOF(); @@ -106,6 +118,15 @@ namespace mongo { // Stats CommonStats _commonStats; AndHashStats _specificStats; + + // The usage in bytes of all buffered data that we're holding. + // Memory usage is calculated from keys held in _dataMap only. + // For simplicity, results in _lookAheadResults do not count towards the limit. + size_t _memUsage; + + // Upper limit for buffered data memory usage. + // Defaults to 32 MB (See kMaxBytes in and_hash.cpp). + size_t _maxMemUsage; }; } // namespace mongo diff --git a/src/mongo/dbtests/query_stage_and.cpp b/src/mongo/dbtests/query_stage_and.cpp index 4fbfba45378..b6f7cd8b986 100644 --- a/src/mongo/dbtests/query_stage_and.cpp +++ b/src/mongo/dbtests/query_stage_and.cpp @@ -46,6 +46,7 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/structure/collection_iterator.h" #include "mongo/dbtests/dbtests.h" +#include "mongo/util/mongoutils/str.h" namespace QueryStageAnd { @@ -62,7 +63,11 @@ namespace QueryStageAnd { } IndexDescriptor* getIndex(const BSONObj& obj, Collection* coll) { - return coll->getIndexCatalog()->findIndexByKeyPattern( obj ); + IndexDescriptor* descriptor = coll->getIndexCatalog()->findIndexByKeyPattern( obj ); + if (NULL == descriptor) { + FAIL(mongoutils::str::stream() << "Unable to find index with key pattern " << obj); + } + return descriptor; } void getLocs(set<DiskLoc>* out, Collection* coll) { @@ -83,11 +88,19 @@ namespace QueryStageAnd { _client.remove(ns(), obj); } + /** + * Executes plan stage until EOF. + * Returns number of results seen if execution reaches EOF successfully. + * Otherwise, returns -1 on stage failure. + */ int countResults(PlanStage* stage) { int count = 0; while (!stage->isEOF()) { WorkingSetID id = WorkingSet::INVALID_ID; PlanStage::StageState status = stage->work(&id); + if (PlanStage::FAILURE == status) { + return -1; + } if (PlanStage::ADVANCED != status) { continue; } ++count; } @@ -162,6 +175,7 @@ namespace QueryStageAnd { // ...invalidate one of the read objects set<DiskLoc> data; getLocs(&data, coll); + size_t memUsageBefore = ah->getMemUsage(); for (set<DiskLoc>::const_iterator it = data.begin(); it != data.end(); ++it) { if (it->obj()["foo"].numberInt() == 15) { ah->invalidate(*it, INVALIDATION_DELETION); @@ -169,8 +183,12 @@ namespace QueryStageAnd { break; } } + size_t memUsageAfter = ah->getMemUsage(); ah->recoverFromYield(); + // Invalidating a read object should decrease memory usage. + ASSERT_LESS_THAN(memUsageAfter, memUsageBefore); + // And expect to find foo==15 it flagged for review. const unordered_set<WorkingSetID>& flagged = ws.getFlagged(); ASSERT_EQUALS(size_t(1), flagged.size()); @@ -257,12 +275,19 @@ namespace QueryStageAnd { ah->prepareToYield(); set<DiskLoc> data; getLocs(&data, coll); + + size_t memUsageBefore = ah->getMemUsage(); for (set<DiskLoc>::const_iterator it = data.begin(); it != data.end(); ++it) { if (0 == deletedObj.woCompare(it->obj())) { ah->invalidate(*it, INVALIDATION_DELETION); break; } } + + size_t memUsageAfter = ah->getMemUsage(); + // Look ahead results do not count towards memory usage. + ASSERT_EQUALS(memUsageBefore, memUsageAfter); + ah->recoverFromYield(); // The deleted obj should show up in flagged. @@ -283,6 +308,155 @@ namespace QueryStageAnd { } }; + // An AND with two children. + class QueryStageAndHashTwoLeaf : public QueryStageAndBase { + public: + void run() { + Client::WriteContext ctx(ns()); + Database* db = ctx.ctx().db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + coll = db->createCollection(ns()); + } + + for (int i = 0; i < 50; ++i) { + insert(BSON("foo" << i << "bar" << i)); + } + + addIndex(BSON("foo" << 1)); + addIndex(BSON("bar" << 1)); + + WorkingSet ws; + scoped_ptr<AndHashStage> ah(new AndHashStage(&ws, NULL)); + + // Foo <= 20 + IndexScanParams params; + params.descriptor = getIndex(BSON("foo" << 1), coll); + params.bounds.isSimpleRange = true; + params.bounds.startKey = BSON("" << 20); + params.bounds.endKey = BSONObj(); + params.bounds.endKeyInclusive = true; + params.direction = -1; + ah->addChild(new IndexScan(params, &ws, NULL)); + + // Bar >= 10 + params.descriptor = getIndex(BSON("bar" << 1), coll); + params.bounds.startKey = BSON("" << 10); + params.bounds.endKey = BSONObj(); + params.bounds.endKeyInclusive = true; + params.direction = 1; + ah->addChild(new IndexScan(params, &ws, NULL)); + + // foo == bar == baz, and foo<=20, bar>=10, so our values are: + // foo == 10, 11, 12, 13, 14, 15. 16, 17, 18, 19, 20 + ASSERT_EQUALS(11, countResults(ah.get())); + } + }; + + // An AND with two children. + // Add large keys (512 bytes) to index of first child to cause + // internal buffer within hashed AND to exceed threshold (32MB) + // before gathering all requested results. + class QueryStageAndHashTwoLeafFirstChildLargeKeys : public QueryStageAndBase { + public: + void run() { + Client::WriteContext ctx(ns()); + Database* db = ctx.ctx().db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + coll = db->createCollection(ns()); + } + + // Generate large keys for {foo: 1, big: 1} index. + std::string big(512, 'a'); + for (int i = 0; i < 50; ++i) { + insert(BSON("foo" << i << "bar" << i << "big" << big)); + } + + addIndex(BSON("foo" << 1 << "big" << 1)); + addIndex(BSON("bar" << 1)); + + // Lower buffer limit to 20 * sizeof(big) to force memory error + // before hashed AND is done reading the first child (stage has to + // hold 21 keys in buffer for Foo <= 20). + WorkingSet ws; + scoped_ptr<AndHashStage> ah(new AndHashStage(&ws, NULL, 20 * big.size())); + + // Foo <= 20 + IndexScanParams params; + params.descriptor = getIndex(BSON("foo" << 1 << "big" << 1), coll); + params.bounds.isSimpleRange = true; + params.bounds.startKey = BSON("" << 20 << "" << big); + params.bounds.endKey = BSONObj(); + params.bounds.endKeyInclusive = true; + params.direction = -1; + ah->addChild(new IndexScan(params, &ws, NULL)); + + // Bar >= 10 + params.descriptor = getIndex(BSON("bar" << 1), coll); + params.bounds.startKey = BSON("" << 10); + params.bounds.endKey = BSONObj(); + params.bounds.endKeyInclusive = true; + params.direction = 1; + ah->addChild(new IndexScan(params, &ws, NULL)); + + // Stage execution should fail. + ASSERT_EQUALS(-1, countResults(ah.get())); + } + }; + + // An AND with three children. + // Add large keys (512 bytes) to index of last child to verify that + // keys in last child are not buffered + class QueryStageAndHashTwoLeafLastChildLargeKeys : public QueryStageAndBase { + public: + void run() { + Client::WriteContext ctx(ns()); + Database* db = ctx.ctx().db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + coll = db->createCollection(ns()); + } + + // Generate large keys for {baz: 1, big: 1} index. + std::string big(512, 'a'); + for (int i = 0; i < 50; ++i) { + insert(BSON("foo" << i << "bar" << i << "big" << big)); + } + + addIndex(BSON("foo" << 1)); + addIndex(BSON("bar" << 1 << "big" << 1)); + + // Lower buffer limit to 5 * sizeof(big) to ensure that + // keys in last child's index are not buffered. There are 6 keys + // that satisfy the criteria Foo <= 20 and Bar >= 10 and 5 <= baz <= 15. + WorkingSet ws; + scoped_ptr<AndHashStage> ah(new AndHashStage(&ws, NULL, 5 * big.size())); + + // Foo <= 20 + IndexScanParams params; + params.descriptor = getIndex(BSON("foo" << 1), coll); + params.bounds.isSimpleRange = true; + params.bounds.startKey = BSON("" << 20); + params.bounds.endKey = BSONObj(); + params.bounds.endKeyInclusive = true; + params.direction = -1; + ah->addChild(new IndexScan(params, &ws, NULL)); + + // Bar >= 10 + params.descriptor = getIndex(BSON("bar" << 1 << "big" << 1), coll); + params.bounds.startKey = BSON("" << 10 << "" << big); + params.bounds.endKey = BSONObj(); + params.bounds.endKeyInclusive = true; + params.direction = 1; + ah->addChild(new IndexScan(params, &ws, NULL)); + + // foo == bar == baz, and foo<=20, bar>=10, so our values are: + // foo == 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20. + ASSERT_EQUALS(11, countResults(ah.get())); + } + }; + // An AND with three children. class QueryStageAndHashThreeLeaf : public QueryStageAndBase { public: @@ -337,6 +511,70 @@ namespace QueryStageAnd { } }; + // An AND with three children. + // Add large keys (512 bytes) to index of second child to cause + // internal buffer within hashed AND to exceed threshold (32MB) + // before gathering all requested results. + // We need 3 children because the hashed AND stage buffered data for + // N-1 of its children. If the second child is the last child, it will not + // be buffered. + class QueryStageAndHashThreeLeafMiddleChildLargeKeys : public QueryStageAndBase { + public: + void run() { + Client::WriteContext ctx(ns()); + Database* db = ctx.ctx().db(); + Collection* coll = db->getCollection(ns()); + if (!coll) { + coll = db->createCollection(ns()); + } + + // Generate large keys for {bar: 1, big: 1} index. + std::string big(512, 'a'); + for (int i = 0; i < 50; ++i) { + insert(BSON("foo" << i << "bar" << i << "baz" << i << "big" << big)); + } + + addIndex(BSON("foo" << 1)); + addIndex(BSON("bar" << 1 << "big" << 1)); + addIndex(BSON("baz" << 1)); + + // Lower buffer limit to 10 * sizeof(big) to force memory error + // before hashed AND is done reading the second child (stage has to + // hold 11 keys in buffer for Foo <= 20 and Bar >= 10). + WorkingSet ws; + scoped_ptr<AndHashStage> ah(new AndHashStage(&ws, NULL, 10 * big.size())); + + // Foo <= 20 + IndexScanParams params; + params.descriptor = getIndex(BSON("foo" << 1), coll); + params.bounds.isSimpleRange = true; + params.bounds.startKey = BSON("" << 20); + params.bounds.endKey = BSONObj(); + params.bounds.endKeyInclusive = true; + params.direction = -1; + ah->addChild(new IndexScan(params, &ws, NULL)); + + // Bar >= 10 + params.descriptor = getIndex(BSON("bar" << 1 << "big" << 1), coll); + params.bounds.startKey = BSON("" << 10 << "" << big); + params.bounds.endKey = BSONObj(); + params.bounds.endKeyInclusive = true; + params.direction = 1; + ah->addChild(new IndexScan(params, &ws, NULL)); + + // 5 <= baz <= 15 + params.descriptor = getIndex(BSON("baz" << 1), coll); + params.bounds.startKey = BSON("" << 5); + params.bounds.endKey = BSON("" << 15); + params.bounds.endKeyInclusive = true; + params.direction = 1; + ah->addChild(new IndexScan(params, &ws, NULL)); + + // Stage execution should fail. + ASSERT_EQUALS(-1, countResults(ah.get())); + } + }; + // An AND with an index scan that returns nothing. class QueryStageAndHashWithNothing : public QueryStageAndBase { public: @@ -865,7 +1103,11 @@ namespace QueryStageAnd { void setupTests() { add<QueryStageAndHashInvalidation>(); + add<QueryStageAndHashTwoLeaf>(); + add<QueryStageAndHashTwoLeafFirstChildLargeKeys>(); + add<QueryStageAndHashTwoLeafLastChildLargeKeys>(); add<QueryStageAndHashThreeLeaf>(); + add<QueryStageAndHashThreeLeafMiddleChildLargeKeys>(); add<QueryStageAndHashWithNothing>(); add<QueryStageAndHashProducesNothing>(); add<QueryStageAndHashWithMatcher>(); |