summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2014-02-25 21:23:31 -0500
committerBenety Goh <benety@mongodb.com>2014-02-27 19:09:23 -0500
commit5e120d33c761d2dd89ae260751ab1368c512a8d5 (patch)
tree00928fc54037c9cfd717eb69936eb29fddde5edf /src/mongo
parent04f7ab3acc4e10edac4e34a7a8a4bc47b1934132 (diff)
downloadmongo-5e120d33c761d2dd89ae260751ab1368c512a8d5.tar.gz
SERVER-12868 added memory usage accounting to hashed AND stage
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/exec/and_hash.cpp73
-rw-r--r--src/mongo/db/exec/and_hash.h21
-rw-r--r--src/mongo/dbtests/query_stage_and.cpp244
3 files changed, 332 insertions, 6 deletions
diff --git a/src/mongo/db/exec/and_hash.cpp b/src/mongo/db/exec/and_hash.cpp
index 8f60859b58f..d30a2e0123e 100644
--- a/src/mongo/db/exec/and_hash.cpp
+++ b/src/mongo/db/exec/and_hash.cpp
@@ -34,15 +34,49 @@
#include "mongo/db/exec/working_set.h"
#include "mongo/util/mongoutils/str.h"
+namespace {
+
+ using namespace mongo;
+
+ // Upper limit for buffered data.
+ // Stage execution will fail once size of all buffered data exceeds this threshold.
+ const size_t kDefaultMaxMemUsageBytes = 32 * 1024 * 1024;
+
+ /**
+ * Returns expected memory usage of working set member
+ */
+ size_t getMemberMemUsage(WorkingSetMember* member) {
+ size_t memUsage = 0;
+ for (size_t i = 0; i < member->keyData.size(); ++i) {
+ const IndexKeyDatum& keyDatum = member->keyData[i];
+ memUsage += keyDatum.keyData.objsize();
+ }
+ return memUsage;
+ }
+
+} // namespace
+
namespace mongo {
+ using std::auto_ptr;
+
const size_t AndHashStage::kLookAheadWorks = 10;
AndHashStage::AndHashStage(WorkingSet* ws, const MatchExpression* filter)
: _ws(ws),
_filter(filter),
_hashingChildren(true),
- _currentChild(0) {}
+ _currentChild(0),
+ _memUsage(0),
+ _maxMemUsage(kDefaultMaxMemUsageBytes) {}
+
+ AndHashStage::AndHashStage(WorkingSet* ws, const MatchExpression* filter, size_t maxMemUsage)
+ : _ws(ws),
+ _filter(filter),
+ _hashingChildren(true),
+ _currentChild(0),
+ _memUsage(0),
+ _maxMemUsage(maxMemUsage) {}
AndHashStage::~AndHashStage() {
for (size_t i = 0; i < _children.size(); ++i) { delete _children[i]; }
@@ -50,6 +84,10 @@ namespace mongo {
void AndHashStage::addChild(PlanStage* child) { _children.push_back(child); }
+ size_t AndHashStage::getMemUsage() const {
+ return _memUsage;
+ }
+
bool AndHashStage::isEOF() {
// This is empty before calling work() and not-empty after.
if (_lookAheadResults.empty()) { return false; }
@@ -139,6 +177,16 @@ namespace mongo {
// We read the first child into our hash table.
if (_hashingChildren) {
+ // Check memory usage of previously hashed results.
+ if (_memUsage > _maxMemUsage) {
+ mongoutils::str::stream ss;
+ ss << "hashed AND stage buffered data usage of " << _memUsage
+ << " bytes exceeds internal limit of " << kDefaultMaxMemUsageBytes << " bytes";
+ Status status(ErrorCodes::Overflow, ss);
+ *out = WorkingSetCommon::allocateStatusMember( _ws, status);
+ return PlanStage::FAILURE;
+ }
+
if (0 == _currentChild) {
return readFirstChild(out);
}
@@ -241,6 +289,10 @@ namespace mongo {
verify(_dataMap.end() == _dataMap.find(member->loc));
_dataMap[member->loc] = id;
+
+ // Update memory stats.
+ _memUsage += getMemberMemUsage(member);
+
++_commonStats.needTime;
return PlanStage::NEED_TIME;
}
@@ -309,7 +361,12 @@ namespace mongo {
// We have a hit. Copy data into the WSM we already have.
_seenMap.insert(member->loc);
WorkingSetMember* olderMember = _ws->get(_dataMap[member->loc]);
+ size_t memUsageBefore = getMemberMemUsage(olderMember);
+
AndCommon::mergeFrom(olderMember, *member);
+
+ // Update memory stats.
+ _memUsage += getMemberMemUsage(olderMember) - memUsageBefore;
}
_ws->free(id);
++_commonStats.needTime;
@@ -325,6 +382,11 @@ namespace mongo {
if (_seenMap.end() == _seenMap.find(it->first)) {
DataMap::iterator toErase = it;
++it;
+
+ // Update memory stats.
+ WorkingSetMember* member = _ws->get(toErase->second);
+ _memUsage -= getMemberMemUsage(member);
+
_ws->free(toErase->second);
_dataMap.erase(toErase);
}
@@ -435,6 +497,9 @@ namespace mongo {
++_specificStats.flaggedButPassed;
}
+ // Update memory stats.
+ _memUsage -= getMemberMemUsage(member);
+
// The loc is about to be invalidated. Fetch it and clear the loc.
WorkingSetCommon::fetchAndInvalidateLoc(member);
@@ -449,10 +514,8 @@ namespace mongo {
PlanStageStats* AndHashStage::getStats() {
_commonStats.isEOF = isEOF();
- // XXX: populate
- // _specificStats.memLimit
- // _specificStats.memUsage
- // when ben's change is in
+ _specificStats.memLimit = _maxMemUsage;
+ _specificStats.memUsage = _memUsage;
auto_ptr<PlanStageStats> ret(new PlanStageStats(_commonStats, STAGE_AND_HASH));
ret->specific.reset(new AndHashStats(_specificStats));
diff --git a/src/mongo/db/exec/and_hash.h b/src/mongo/db/exec/and_hash.h
index 6d14bf16c79..a546057ffe2 100644
--- a/src/mongo/db/exec/and_hash.h
+++ b/src/mongo/db/exec/and_hash.h
@@ -53,10 +53,22 @@ namespace mongo {
class AndHashStage : public PlanStage {
public:
AndHashStage(WorkingSet* ws, const MatchExpression* filter);
+
+ /**
+ * For testing only. Allows tests to set memory usage threshold.
+ */
+ AndHashStage(WorkingSet* ws, const MatchExpression* filter, size_t maxMemUsage);
+
virtual ~AndHashStage();
void addChild(PlanStage* child);
+ /**
+ * Returns memory usage.
+ * For testing only.
+ */
+ size_t getMemUsage() const;
+
virtual StageState work(WorkingSetID* out);
virtual bool isEOF();
@@ -106,6 +118,15 @@ namespace mongo {
// Stats
CommonStats _commonStats;
AndHashStats _specificStats;
+
+ // The usage in bytes of all buffered data that we're holding.
+ // Memory usage is calculated from keys held in _dataMap only.
+ // For simplicity, results in _lookAheadResults do not count towards the limit.
+ size_t _memUsage;
+
+ // Upper limit for buffered data memory usage.
+ // Defaults to 32 MB (See kMaxBytes in and_hash.cpp).
+ size_t _maxMemUsage;
};
} // namespace mongo
diff --git a/src/mongo/dbtests/query_stage_and.cpp b/src/mongo/dbtests/query_stage_and.cpp
index 4fbfba45378..b6f7cd8b986 100644
--- a/src/mongo/dbtests/query_stage_and.cpp
+++ b/src/mongo/dbtests/query_stage_and.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/structure/collection_iterator.h"
#include "mongo/dbtests/dbtests.h"
+#include "mongo/util/mongoutils/str.h"
namespace QueryStageAnd {
@@ -62,7 +63,11 @@ namespace QueryStageAnd {
}
IndexDescriptor* getIndex(const BSONObj& obj, Collection* coll) {
- return coll->getIndexCatalog()->findIndexByKeyPattern( obj );
+ IndexDescriptor* descriptor = coll->getIndexCatalog()->findIndexByKeyPattern( obj );
+ if (NULL == descriptor) {
+ FAIL(mongoutils::str::stream() << "Unable to find index with key pattern " << obj);
+ }
+ return descriptor;
}
void getLocs(set<DiskLoc>* out, Collection* coll) {
@@ -83,11 +88,19 @@ namespace QueryStageAnd {
_client.remove(ns(), obj);
}
+ /**
+ * Executes plan stage until EOF.
+ * Returns number of results seen if execution reaches EOF successfully.
+ * Otherwise, returns -1 on stage failure.
+ */
int countResults(PlanStage* stage) {
int count = 0;
while (!stage->isEOF()) {
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState status = stage->work(&id);
+ if (PlanStage::FAILURE == status) {
+ return -1;
+ }
if (PlanStage::ADVANCED != status) { continue; }
++count;
}
@@ -162,6 +175,7 @@ namespace QueryStageAnd {
// ...invalidate one of the read objects
set<DiskLoc> data;
getLocs(&data, coll);
+ size_t memUsageBefore = ah->getMemUsage();
for (set<DiskLoc>::const_iterator it = data.begin(); it != data.end(); ++it) {
if (it->obj()["foo"].numberInt() == 15) {
ah->invalidate(*it, INVALIDATION_DELETION);
@@ -169,8 +183,12 @@ namespace QueryStageAnd {
break;
}
}
+ size_t memUsageAfter = ah->getMemUsage();
ah->recoverFromYield();
+ // Invalidating a read object should decrease memory usage.
+ ASSERT_LESS_THAN(memUsageAfter, memUsageBefore);
+
// And expect to find foo==15 it flagged for review.
const unordered_set<WorkingSetID>& flagged = ws.getFlagged();
ASSERT_EQUALS(size_t(1), flagged.size());
@@ -257,12 +275,19 @@ namespace QueryStageAnd {
ah->prepareToYield();
set<DiskLoc> data;
getLocs(&data, coll);
+
+ size_t memUsageBefore = ah->getMemUsage();
for (set<DiskLoc>::const_iterator it = data.begin(); it != data.end(); ++it) {
if (0 == deletedObj.woCompare(it->obj())) {
ah->invalidate(*it, INVALIDATION_DELETION);
break;
}
}
+
+ size_t memUsageAfter = ah->getMemUsage();
+ // Look ahead results do not count towards memory usage.
+ ASSERT_EQUALS(memUsageBefore, memUsageAfter);
+
ah->recoverFromYield();
// The deleted obj should show up in flagged.
@@ -283,6 +308,155 @@ namespace QueryStageAnd {
}
};
+ // An AND with two children.
+ class QueryStageAndHashTwoLeaf : public QueryStageAndBase {
+ public:
+ void run() {
+ Client::WriteContext ctx(ns());
+ Database* db = ctx.ctx().db();
+ Collection* coll = db->getCollection(ns());
+ if (!coll) {
+ coll = db->createCollection(ns());
+ }
+
+ for (int i = 0; i < 50; ++i) {
+ insert(BSON("foo" << i << "bar" << i));
+ }
+
+ addIndex(BSON("foo" << 1));
+ addIndex(BSON("bar" << 1));
+
+ WorkingSet ws;
+ scoped_ptr<AndHashStage> ah(new AndHashStage(&ws, NULL));
+
+ // Foo <= 20
+ IndexScanParams params;
+ params.descriptor = getIndex(BSON("foo" << 1), coll);
+ params.bounds.isSimpleRange = true;
+ params.bounds.startKey = BSON("" << 20);
+ params.bounds.endKey = BSONObj();
+ params.bounds.endKeyInclusive = true;
+ params.direction = -1;
+ ah->addChild(new IndexScan(params, &ws, NULL));
+
+ // Bar >= 10
+ params.descriptor = getIndex(BSON("bar" << 1), coll);
+ params.bounds.startKey = BSON("" << 10);
+ params.bounds.endKey = BSONObj();
+ params.bounds.endKeyInclusive = true;
+ params.direction = 1;
+ ah->addChild(new IndexScan(params, &ws, NULL));
+
+ // foo == bar == baz, and foo<=20, bar>=10, so our values are:
+ // foo == 10, 11, 12, 13, 14, 15. 16, 17, 18, 19, 20
+ ASSERT_EQUALS(11, countResults(ah.get()));
+ }
+ };
+
+ // An AND with two children.
+ // Add large keys (512 bytes) to index of first child to cause
+ // internal buffer within hashed AND to exceed threshold (32MB)
+ // before gathering all requested results.
+ class QueryStageAndHashTwoLeafFirstChildLargeKeys : public QueryStageAndBase {
+ public:
+ void run() {
+ Client::WriteContext ctx(ns());
+ Database* db = ctx.ctx().db();
+ Collection* coll = db->getCollection(ns());
+ if (!coll) {
+ coll = db->createCollection(ns());
+ }
+
+ // Generate large keys for {foo: 1, big: 1} index.
+ std::string big(512, 'a');
+ for (int i = 0; i < 50; ++i) {
+ insert(BSON("foo" << i << "bar" << i << "big" << big));
+ }
+
+ addIndex(BSON("foo" << 1 << "big" << 1));
+ addIndex(BSON("bar" << 1));
+
+ // Lower buffer limit to 20 * sizeof(big) to force memory error
+ // before hashed AND is done reading the first child (stage has to
+ // hold 21 keys in buffer for Foo <= 20).
+ WorkingSet ws;
+ scoped_ptr<AndHashStage> ah(new AndHashStage(&ws, NULL, 20 * big.size()));
+
+ // Foo <= 20
+ IndexScanParams params;
+ params.descriptor = getIndex(BSON("foo" << 1 << "big" << 1), coll);
+ params.bounds.isSimpleRange = true;
+ params.bounds.startKey = BSON("" << 20 << "" << big);
+ params.bounds.endKey = BSONObj();
+ params.bounds.endKeyInclusive = true;
+ params.direction = -1;
+ ah->addChild(new IndexScan(params, &ws, NULL));
+
+ // Bar >= 10
+ params.descriptor = getIndex(BSON("bar" << 1), coll);
+ params.bounds.startKey = BSON("" << 10);
+ params.bounds.endKey = BSONObj();
+ params.bounds.endKeyInclusive = true;
+ params.direction = 1;
+ ah->addChild(new IndexScan(params, &ws, NULL));
+
+ // Stage execution should fail.
+ ASSERT_EQUALS(-1, countResults(ah.get()));
+ }
+ };
+
+ // An AND with three children.
+ // Add large keys (512 bytes) to index of last child to verify that
+ // keys in last child are not buffered
+ class QueryStageAndHashTwoLeafLastChildLargeKeys : public QueryStageAndBase {
+ public:
+ void run() {
+ Client::WriteContext ctx(ns());
+ Database* db = ctx.ctx().db();
+ Collection* coll = db->getCollection(ns());
+ if (!coll) {
+ coll = db->createCollection(ns());
+ }
+
+ // Generate large keys for {baz: 1, big: 1} index.
+ std::string big(512, 'a');
+ for (int i = 0; i < 50; ++i) {
+ insert(BSON("foo" << i << "bar" << i << "big" << big));
+ }
+
+ addIndex(BSON("foo" << 1));
+ addIndex(BSON("bar" << 1 << "big" << 1));
+
+ // Lower buffer limit to 5 * sizeof(big) to ensure that
+ // keys in last child's index are not buffered. There are 6 keys
+ // that satisfy the criteria Foo <= 20 and Bar >= 10 and 5 <= baz <= 15.
+ WorkingSet ws;
+ scoped_ptr<AndHashStage> ah(new AndHashStage(&ws, NULL, 5 * big.size()));
+
+ // Foo <= 20
+ IndexScanParams params;
+ params.descriptor = getIndex(BSON("foo" << 1), coll);
+ params.bounds.isSimpleRange = true;
+ params.bounds.startKey = BSON("" << 20);
+ params.bounds.endKey = BSONObj();
+ params.bounds.endKeyInclusive = true;
+ params.direction = -1;
+ ah->addChild(new IndexScan(params, &ws, NULL));
+
+ // Bar >= 10
+ params.descriptor = getIndex(BSON("bar" << 1 << "big" << 1), coll);
+ params.bounds.startKey = BSON("" << 10 << "" << big);
+ params.bounds.endKey = BSONObj();
+ params.bounds.endKeyInclusive = true;
+ params.direction = 1;
+ ah->addChild(new IndexScan(params, &ws, NULL));
+
+ // foo == bar == baz, and foo<=20, bar>=10, so our values are:
+ // foo == 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20.
+ ASSERT_EQUALS(11, countResults(ah.get()));
+ }
+ };
+
// An AND with three children.
class QueryStageAndHashThreeLeaf : public QueryStageAndBase {
public:
@@ -337,6 +511,70 @@ namespace QueryStageAnd {
}
};
+ // An AND with three children.
+ // Add large keys (512 bytes) to index of second child to cause
+ // internal buffer within hashed AND to exceed threshold (32MB)
+ // before gathering all requested results.
+ // We need 3 children because the hashed AND stage buffered data for
+ // N-1 of its children. If the second child is the last child, it will not
+ // be buffered.
+ class QueryStageAndHashThreeLeafMiddleChildLargeKeys : public QueryStageAndBase {
+ public:
+ void run() {
+ Client::WriteContext ctx(ns());
+ Database* db = ctx.ctx().db();
+ Collection* coll = db->getCollection(ns());
+ if (!coll) {
+ coll = db->createCollection(ns());
+ }
+
+ // Generate large keys for {bar: 1, big: 1} index.
+ std::string big(512, 'a');
+ for (int i = 0; i < 50; ++i) {
+ insert(BSON("foo" << i << "bar" << i << "baz" << i << "big" << big));
+ }
+
+ addIndex(BSON("foo" << 1));
+ addIndex(BSON("bar" << 1 << "big" << 1));
+ addIndex(BSON("baz" << 1));
+
+ // Lower buffer limit to 10 * sizeof(big) to force memory error
+ // before hashed AND is done reading the second child (stage has to
+ // hold 11 keys in buffer for Foo <= 20 and Bar >= 10).
+ WorkingSet ws;
+ scoped_ptr<AndHashStage> ah(new AndHashStage(&ws, NULL, 10 * big.size()));
+
+ // Foo <= 20
+ IndexScanParams params;
+ params.descriptor = getIndex(BSON("foo" << 1), coll);
+ params.bounds.isSimpleRange = true;
+ params.bounds.startKey = BSON("" << 20);
+ params.bounds.endKey = BSONObj();
+ params.bounds.endKeyInclusive = true;
+ params.direction = -1;
+ ah->addChild(new IndexScan(params, &ws, NULL));
+
+ // Bar >= 10
+ params.descriptor = getIndex(BSON("bar" << 1 << "big" << 1), coll);
+ params.bounds.startKey = BSON("" << 10 << "" << big);
+ params.bounds.endKey = BSONObj();
+ params.bounds.endKeyInclusive = true;
+ params.direction = 1;
+ ah->addChild(new IndexScan(params, &ws, NULL));
+
+ // 5 <= baz <= 15
+ params.descriptor = getIndex(BSON("baz" << 1), coll);
+ params.bounds.startKey = BSON("" << 5);
+ params.bounds.endKey = BSON("" << 15);
+ params.bounds.endKeyInclusive = true;
+ params.direction = 1;
+ ah->addChild(new IndexScan(params, &ws, NULL));
+
+ // Stage execution should fail.
+ ASSERT_EQUALS(-1, countResults(ah.get()));
+ }
+ };
+
// An AND with an index scan that returns nothing.
class QueryStageAndHashWithNothing : public QueryStageAndBase {
public:
@@ -865,7 +1103,11 @@ namespace QueryStageAnd {
void setupTests() {
add<QueryStageAndHashInvalidation>();
+ add<QueryStageAndHashTwoLeaf>();
+ add<QueryStageAndHashTwoLeafFirstChildLargeKeys>();
+ add<QueryStageAndHashTwoLeafLastChildLargeKeys>();
add<QueryStageAndHashThreeLeaf>();
+ add<QueryStageAndHashThreeLeafMiddleChildLargeKeys>();
add<QueryStageAndHashWithNothing>();
add<QueryStageAndHashProducesNothing>();
add<QueryStageAndHashWithMatcher>();