/**
* Copyright (C) 2013-2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/index_scan.h"
#include "mongo/db/exec/merge_sort.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/json.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/dbtests/dbtests.h"
#include "mongo/stdx/memory.h"
/**
* This file tests db/exec/merge_sort.cpp
*/
namespace QueryStageMergeSortTests {
using std::set;
using std::string;
using std::unique_ptr;
using stdx::make_unique;
class QueryStageMergeSortTestBase {
public:
QueryStageMergeSortTestBase() : _client(&_txn) {}
virtual ~QueryStageMergeSortTestBase() {
OldClientWriteContext ctx(&_txn, ns());
_client.dropCollection(ns());
}
void addIndex(const BSONObj& obj) {
ASSERT_OK(dbtests::createIndex(&_txn, ns(), obj));
}
IndexDescriptor* getIndex(const BSONObj& obj, Collection* coll) {
std::vector indexes;
coll->getIndexCatalog()->findIndexesByKeyPattern(&_txn, obj, false, &indexes);
return indexes.empty() ? nullptr : indexes[0];
}
void insert(const BSONObj& obj) {
_client.insert(ns(), obj);
}
void remove(const BSONObj& obj) {
_client.remove(ns(), obj);
}
void getRecordIds(set* out, Collection* coll) {
auto cursor = coll->getCursor(&_txn);
while (auto record = cursor->next()) {
out->insert(record->id);
}
}
BSONObj objWithMinKey(int start) {
BSONObjBuilder startKeyBob;
startKeyBob.append("", start);
startKeyBob.appendMinKey("");
return startKeyBob.obj();
}
BSONObj objWithMaxKey(int start) {
BSONObjBuilder endKeyBob;
endKeyBob.append("", start);
endKeyBob.appendMaxKey("");
return endKeyBob.obj();
}
static const char* ns() {
return "unittests.QueryStageMergeSort";
}
protected:
const ServiceContext::UniqueOperationContext _txnPtr = cc().makeOperationContext();
OperationContext& _txn = *_txnPtr;
private:
DBDirectClient _client;
};
// SERVER-1205:
// find($or[{a:1}, {b:1}]).sort({c:1}) with indices {a:1, c:1} and {b:1, c:1}.
class QueryStageMergeSortPrefixIndex : public QueryStageMergeSortTestBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
const int N = 50;
for (int i = 0; i < N; ++i) {
insert(BSON("a" << 1 << "c" << i));
insert(BSON("b" << 1 << "c" << i));
}
BSONObj firstIndex = BSON("a" << 1 << "c" << 1);
BSONObj secondIndex = BSON("b" << 1 << "c" << 1);
addIndex(firstIndex);
addIndex(secondIndex);
unique_ptr ws = make_unique();
// Sort by c:1
MergeSortStageParams msparams;
msparams.pattern = BSON("c" << 1);
MergeSortStage* ms = new MergeSortStage(&_txn, msparams, ws.get(), coll);
// a:1
IndexScanParams params;
params.descriptor = getIndex(firstIndex, coll);
params.bounds.isSimpleRange = true;
params.bounds.startKey = objWithMinKey(1);
params.bounds.endKey = objWithMaxKey(1);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
params.direction = 1;
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
// b:1
params.descriptor = getIndex(secondIndex, coll);
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
unique_ptr fetchStage =
make_unique(&_txn, ws.get(), ms, nullptr, coll);
// Must fetch if we want to easily pull out an obj.
auto statusWithPlanExecutor = PlanExecutor::make(
&_txn, std::move(ws), std::move(fetchStage), coll, PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
unique_ptr exec = std::move(statusWithPlanExecutor.getValue());
for (int i = 0; i < N; ++i) {
BSONObj first, second;
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL));
first = first.getOwned();
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL));
ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt());
ASSERT_EQUALS(i, first["c"].numberInt());
ASSERT((first.hasField("a") && second.hasField("b")) ||
(first.hasField("b") && second.hasField("a")));
}
// Should be done now.
BSONObj foo;
ASSERT_NOT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&foo, NULL));
}
};
// Each inserted document appears in both indices but is deduped and returned properly/sorted.
class QueryStageMergeSortDups : public QueryStageMergeSortTestBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
const int N = 50;
for (int i = 0; i < N; ++i) {
insert(BSON("a" << 1 << "b" << 1 << "c" << i));
insert(BSON("a" << 1 << "b" << 1 << "c" << i));
}
BSONObj firstIndex = BSON("a" << 1 << "c" << 1);
BSONObj secondIndex = BSON("b" << 1 << "c" << 1);
addIndex(firstIndex);
addIndex(secondIndex);
unique_ptr ws = make_unique();
// Sort by c:1
MergeSortStageParams msparams;
msparams.pattern = BSON("c" << 1);
MergeSortStage* ms = new MergeSortStage(&_txn, msparams, ws.get(), coll);
// a:1
IndexScanParams params;
params.descriptor = getIndex(firstIndex, coll);
params.bounds.isSimpleRange = true;
params.bounds.startKey = objWithMinKey(1);
params.bounds.endKey = objWithMaxKey(1);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
params.direction = 1;
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
// b:1
params.descriptor = getIndex(secondIndex, coll);
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
unique_ptr fetchStage =
make_unique(&_txn, ws.get(), ms, nullptr, coll);
auto statusWithPlanExecutor = PlanExecutor::make(
&_txn, std::move(ws), std::move(fetchStage), coll, PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
unique_ptr exec = std::move(statusWithPlanExecutor.getValue());
for (int i = 0; i < N; ++i) {
BSONObj first, second;
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL));
first = first.getOwned();
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL));
ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt());
ASSERT_EQUALS(i, first["c"].numberInt());
ASSERT((first.hasField("a") && second.hasField("b")) ||
(first.hasField("b") && second.hasField("a")));
}
// Should be done now.
BSONObj foo;
ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL));
}
};
// Each inserted document appears in both indices, no deduping, get each result twice.
class QueryStageMergeSortDupsNoDedup : public QueryStageMergeSortTestBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
const int N = 50;
for (int i = 0; i < N; ++i) {
insert(BSON("a" << 1 << "b" << 1 << "c" << i));
}
BSONObj firstIndex = BSON("a" << 1 << "c" << 1);
BSONObj secondIndex = BSON("b" << 1 << "c" << 1);
addIndex(firstIndex);
addIndex(secondIndex);
unique_ptr ws = make_unique();
// Sort by c:1
MergeSortStageParams msparams;
msparams.dedup = false;
msparams.pattern = BSON("c" << 1);
MergeSortStage* ms = new MergeSortStage(&_txn, msparams, ws.get(), coll);
// a:1
IndexScanParams params;
params.descriptor = getIndex(firstIndex, coll);
params.bounds.isSimpleRange = true;
params.bounds.startKey = objWithMinKey(1);
params.bounds.endKey = objWithMaxKey(1);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
params.direction = 1;
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
// b:1
params.descriptor = getIndex(secondIndex, coll);
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
unique_ptr fetchStage =
make_unique(&_txn, ws.get(), ms, nullptr, coll);
auto statusWithPlanExecutor = PlanExecutor::make(
&_txn, std::move(ws), std::move(fetchStage), coll, PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
unique_ptr exec = std::move(statusWithPlanExecutor.getValue());
for (int i = 0; i < N; ++i) {
BSONObj first, second;
// We inserted N objects but we get 2 * N from the runner because of dups.
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL));
first = first.getOwned();
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL));
ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt());
ASSERT_EQUALS(i, first["c"].numberInt());
ASSERT((first.hasField("a") && second.hasField("b")) ||
(first.hasField("b") && second.hasField("a")));
}
// Should be done now.
BSONObj foo;
ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL));
}
};
// Decreasing indices merged ok. Basically the test above but decreasing.
class QueryStageMergeSortPrefixIndexReverse : public QueryStageMergeSortTestBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
const int N = 50;
for (int i = 0; i < N; ++i) {
// We insert a:1 c:i for i=0..49 but in reverse order for the heck of it.
insert(BSON("a" << 1 << "c" << N - i - 1));
insert(BSON("b" << 1 << "c" << i));
}
BSONObj firstIndex = BSON("a" << 1 << "c" << -1);
BSONObj secondIndex = BSON("b" << 1 << "c" << -1);
addIndex(firstIndex);
addIndex(secondIndex);
unique_ptr ws = make_unique();
// Sort by c:-1
MergeSortStageParams msparams;
msparams.pattern = BSON("c" << -1);
MergeSortStage* ms = new MergeSortStage(&_txn, msparams, ws.get(), coll);
// a:1
IndexScanParams params;
params.descriptor = getIndex(firstIndex, coll);
params.bounds.isSimpleRange = true;
params.bounds.startKey = objWithMaxKey(1);
params.bounds.endKey = objWithMinKey(1);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
// This is the direction along the index.
params.direction = 1;
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
// b:1
params.descriptor = getIndex(secondIndex, coll);
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
unique_ptr fetchStage =
make_unique(&_txn, ws.get(), ms, nullptr, coll);
auto statusWithPlanExecutor = PlanExecutor::make(
&_txn, std::move(ws), std::move(fetchStage), coll, PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
unique_ptr exec = std::move(statusWithPlanExecutor.getValue());
for (int i = 0; i < N; ++i) {
BSONObj first, second;
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL));
first = first.getOwned();
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL));
ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt());
ASSERT_EQUALS(N - i - 1, first["c"].numberInt());
ASSERT((first.hasField("a") && second.hasField("b")) ||
(first.hasField("b") && second.hasField("a")));
}
// Should be done now.
BSONObj foo;
ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL));
}
};
// One stage EOF immediately
class QueryStageMergeSortOneStageEOF : public QueryStageMergeSortTestBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
const int N = 50;
for (int i = 0; i < N; ++i) {
insert(BSON("a" << 1 << "c" << i));
insert(BSON("b" << 1 << "c" << i));
}
BSONObj firstIndex = BSON("a" << 1 << "c" << 1);
BSONObj secondIndex = BSON("b" << 1 << "c" << 1);
addIndex(firstIndex);
addIndex(secondIndex);
unique_ptr ws = make_unique();
// Sort by c:1
MergeSortStageParams msparams;
msparams.pattern = BSON("c" << 1);
MergeSortStage* ms = new MergeSortStage(&_txn, msparams, ws.get(), coll);
// a:1
IndexScanParams params;
params.descriptor = getIndex(firstIndex, coll);
params.bounds.isSimpleRange = true;
params.bounds.startKey = objWithMinKey(1);
params.bounds.endKey = objWithMaxKey(1);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
params.direction = 1;
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
// b:51 (EOF)
params.descriptor = getIndex(secondIndex, coll);
params.bounds.startKey = BSON("" << 51 << "" << MinKey);
params.bounds.endKey = BSON("" << 51 << "" << MaxKey);
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
unique_ptr fetchStage =
make_unique(&_txn, ws.get(), ms, nullptr, coll);
auto statusWithPlanExecutor = PlanExecutor::make(
&_txn, std::move(ws), std::move(fetchStage), coll, PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
unique_ptr exec = std::move(statusWithPlanExecutor.getValue());
// Only getting results from the a:1 index scan.
for (int i = 0; i < N; ++i) {
BSONObj obj;
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&obj, NULL));
ASSERT_EQUALS(i, obj["c"].numberInt());
ASSERT_EQUALS(1, obj["a"].numberInt());
}
// Should be done now.
BSONObj foo;
ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL));
}
};
// N stages each have 1 result
class QueryStageMergeSortManyShort : public QueryStageMergeSortTestBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
unique_ptr ws = make_unique();
// Sort by foo:1
MergeSortStageParams msparams;
msparams.pattern = BSON("foo" << 1);
MergeSortStage* ms = new MergeSortStage(&_txn, msparams, ws.get(), coll);
IndexScanParams params;
params.bounds.isSimpleRange = true;
params.bounds.startKey = objWithMinKey(1);
params.bounds.endKey = objWithMaxKey(1);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
params.direction = 1;
int numIndices = 20;
for (int i = 0; i < numIndices; ++i) {
// 'a', 'b', ...
string index(1, 'a' + i);
insert(BSON(index << 1 << "foo" << i));
BSONObj indexSpec = BSON(index << 1 << "foo" << 1);
addIndex(indexSpec);
params.descriptor = getIndex(indexSpec, coll);
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
}
unique_ptr fetchStage =
make_unique(&_txn, ws.get(), ms, nullptr, coll);
auto statusWithPlanExecutor = PlanExecutor::make(
&_txn, std::move(ws), std::move(fetchStage), coll, PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
unique_ptr exec = std::move(statusWithPlanExecutor.getValue());
for (int i = 0; i < numIndices; ++i) {
BSONObj obj;
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&obj, NULL));
ASSERT_EQUALS(i, obj["foo"].numberInt());
string index(1, 'a' + i);
ASSERT_EQUALS(1, obj[index].numberInt());
}
// Should be done now.
BSONObj foo;
ASSERT_EQUALS(PlanExecutor::IS_EOF, exec->getNext(&foo, NULL));
}
};
// Invalidation mid-run
class QueryStageMergeSortInvalidation : public QueryStageMergeSortTestBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
WorkingSet ws;
// Sort by foo:1
MergeSortStageParams msparams;
msparams.pattern = BSON("foo" << 1);
auto ms = make_unique(&_txn, msparams, &ws, coll);
IndexScanParams params;
params.bounds.isSimpleRange = true;
params.bounds.startKey = objWithMinKey(1);
params.bounds.endKey = objWithMaxKey(1);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
params.direction = 1;
// Index 'a'+i has foo equal to 'i'.
int numIndices = 20;
for (int i = 0; i < numIndices; ++i) {
// 'a', 'b', ...
string index(1, 'a' + i);
insert(BSON(index << 1 << "foo" << i));
BSONObj indexSpec = BSON(index << 1 << "foo" << 1);
addIndex(indexSpec);
params.descriptor = getIndex(indexSpec, coll);
ms->addChild(new IndexScan(&_txn, params, &ws, NULL));
}
set recordIds;
getRecordIds(&recordIds, coll);
set::iterator it = recordIds.begin();
// Get 10 results. Should be getting results in order of 'recordIds'.
int count = 0;
while (!ms->isEOF() && count < 10) {
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState status = ms->work(&id);
if (PlanStage::ADVANCED != status) {
continue;
}
WorkingSetMember* member = ws.get(id);
ASSERT_EQUALS(member->recordId, *it);
BSONElement elt;
string index(1, 'a' + count);
ASSERT(member->getFieldDotted(index, &elt));
ASSERT_EQUALS(1, elt.numberInt());
ASSERT(member->getFieldDotted("foo", &elt));
ASSERT_EQUALS(count, elt.numberInt());
++count;
++it;
}
// Invalidate recordIds[11]. Should force a fetch and return the deleted document.
ms->saveState();
ms->invalidate(&_txn, *it, INVALIDATION_DELETION);
ms->restoreState();
// Make sure recordIds[11] was fetched for us.
{
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState status;
do {
status = ms->work(&id);
} while (PlanStage::ADVANCED != status);
WorkingSetMember* member = ws.get(id);
ASSERT(!member->hasRecordId());
ASSERT(member->hasObj());
string index(1, 'a' + count);
BSONElement elt;
ASSERT_TRUE(member->getFieldDotted(index, &elt));
ASSERT_EQUALS(1, elt.numberInt());
ASSERT(member->getFieldDotted("foo", &elt));
ASSERT_EQUALS(count, elt.numberInt());
++it;
++count;
}
// And get the rest.
while (!ms->isEOF()) {
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState status = ms->work(&id);
if (PlanStage::ADVANCED != status) {
continue;
}
WorkingSetMember* member = ws.get(id);
ASSERT_EQUALS(member->recordId, *it);
BSONElement elt;
string index(1, 'a' + count);
ASSERT_TRUE(member->getFieldDotted(index, &elt));
ASSERT_EQUALS(1, elt.numberInt());
ASSERT(member->getFieldDotted("foo", &elt));
ASSERT_EQUALS(count, elt.numberInt());
++count;
++it;
}
}
};
// Test that if a WSM buffered inside the merge sort stage gets updated, we return the document and
// then correctly dedup if we see the same RecordId again.
class QueryStageMergeSortInvalidationMutationDedup : public QueryStageMergeSortTestBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
// Insert data.
insert(BSON("_id" << 4 << "a" << 4));
insert(BSON("_id" << 5 << "a" << 5));
insert(BSON("_id" << 6 << "a" << 6));
addIndex(BSON("a" << 1));
std::set rids;
getRecordIds(&rids, coll);
set::iterator it = rids.begin();
WorkingSet ws;
WorkingSetMember* member;
MergeSortStageParams msparams;
msparams.pattern = BSON("a" << 1);
auto ms = stdx::make_unique(&_txn, msparams, &ws, coll);
// First child scans [5, 10].
{
IndexScanParams params;
params.descriptor = getIndex(BSON("a" << 1), coll);
params.bounds.isSimpleRange = true;
params.bounds.startKey = BSON("" << 5);
params.bounds.endKey = BSON("" << 10);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
params.direction = 1;
auto fetchStage = stdx::make_unique(
&_txn, &ws, new IndexScan(&_txn, params, &ws, nullptr), nullptr, coll);
ms->addChild(fetchStage.release());
}
// Second child scans [4, 10].
{
IndexScanParams params;
params.descriptor = getIndex(BSON("a" << 1), coll);
params.bounds.isSimpleRange = true;
params.bounds.startKey = BSON("" << 4);
params.bounds.endKey = BSON("" << 10);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
params.direction = 1;
auto fetchStage = stdx::make_unique(
&_txn, &ws, new IndexScan(&_txn, params, &ws, nullptr), nullptr, coll);
ms->addChild(fetchStage.release());
}
// First doc should be {a: 4}.
member = getNextResult(&ws, ms.get());
ASSERT_EQ(member->getState(), WorkingSetMember::RID_AND_OBJ);
ASSERT_EQ(member->recordId, *it);
ASSERT_BSONOBJ_EQ(member->obj.value(), BSON("_id" << 4 << "a" << 4));
++it;
// Doc {a: 5} gets invalidated by an update.
ms->invalidate(&_txn, *it, INVALIDATION_MUTATION);
// Invalidated doc {a: 5} should still get returned.
member = getNextResult(&ws, ms.get());
ASSERT_EQ(member->getState(), WorkingSetMember::OWNED_OBJ);
ASSERT_BSONOBJ_EQ(member->obj.value(), BSON("_id" << 5 << "a" << 5));
++it;
// We correctly dedup the invalidated doc and return {a: 6} next.
member = getNextResult(&ws, ms.get());
ASSERT_EQ(member->getState(), WorkingSetMember::RID_AND_OBJ);
ASSERT_EQ(member->recordId, *it);
ASSERT_BSONOBJ_EQ(member->obj.value(), BSON("_id" << 6 << "a" << 6));
}
private:
WorkingSetMember* getNextResult(WorkingSet* ws, PlanStage* stage) {
while (!stage->isEOF()) {
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState status = stage->work(&id);
if (PlanStage::ADVANCED != status) {
continue;
}
return ws->get(id);
}
FAIL("Expected to produce another result but hit EOF");
return nullptr;
}
};
class QueryStageMergeSortStringsWithNullCollation : public QueryStageMergeSortTestBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
const int N = 50;
for (int i = 0; i < N; ++i) {
insert(BSON("a" << 1 << "c" << i << "d"
<< "abc"));
insert(BSON("b" << 1 << "c" << i << "d"
<< "cba"));
}
BSONObj firstIndex = BSON("a" << 1 << "c" << 1 << "d" << 1);
BSONObj secondIndex = BSON("b" << 1 << "c" << 1 << "d" << 1);
addIndex(firstIndex);
addIndex(secondIndex);
unique_ptr ws = make_unique();
// Sort by c:1, d:1
MergeSortStageParams msparams;
msparams.pattern = BSON("c" << 1 << "d" << 1);
msparams.collator = nullptr;
MergeSortStage* ms = new MergeSortStage(&_txn, msparams, ws.get(), coll);
// a:1
IndexScanParams params;
params.descriptor = getIndex(firstIndex, coll);
params.bounds.isSimpleRange = true;
params.bounds.startKey = objWithMinKey(1);
params.bounds.endKey = objWithMaxKey(1);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
params.direction = 1;
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
// b:1
params.descriptor = getIndex(secondIndex, coll);
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
unique_ptr fetchStage =
make_unique(&_txn, ws.get(), ms, nullptr, coll);
// Must fetch if we want to easily pull out an obj.
auto statusWithPlanExecutor = PlanExecutor::make(
&_txn, std::move(ws), std::move(fetchStage), coll, PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
unique_ptr exec = std::move(statusWithPlanExecutor.getValue());
for (int i = 0; i < N; ++i) {
BSONObj first, second;
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL));
first = first.getOwned();
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL));
ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt());
ASSERT_EQUALS(i, first["c"].numberInt());
// {a: 1, c: i, d: "abc"} should precede {b: 1, c: i, d: "bca"}.
ASSERT(first.hasField("a") && second.hasField("b"));
}
// Should be done now.
BSONObj foo;
ASSERT_NOT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&foo, NULL));
}
};
class QueryStageMergeSortStringsRespectsCollation : public QueryStageMergeSortTestBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
const int N = 50;
for (int i = 0; i < N; ++i) {
insert(BSON("a" << 1 << "c" << i << "d"
<< "abc"));
insert(BSON("b" << 1 << "c" << i << "d"
<< "cba"));
}
BSONObj firstIndex = BSON("a" << 1 << "c" << 1 << "d" << 1);
BSONObj secondIndex = BSON("b" << 1 << "c" << 1 << "d" << 1);
addIndex(firstIndex);
addIndex(secondIndex);
unique_ptr ws = make_unique();
// Sort by c:1, d:1
MergeSortStageParams msparams;
msparams.pattern = BSON("c" << 1 << "d" << 1);
CollatorInterfaceMock collator(CollatorInterfaceMock::MockType::kReverseString);
msparams.collator = &collator;
MergeSortStage* ms = new MergeSortStage(&_txn, msparams, ws.get(), coll);
// a:1
IndexScanParams params;
params.descriptor = getIndex(firstIndex, coll);
params.bounds.isSimpleRange = true;
params.bounds.startKey = objWithMinKey(1);
params.bounds.endKey = objWithMaxKey(1);
params.bounds.boundInclusion = BoundInclusion::kIncludeBothStartAndEndKeys;
params.direction = 1;
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
// b:1
params.descriptor = getIndex(secondIndex, coll);
ms->addChild(new IndexScan(&_txn, params, ws.get(), NULL));
unique_ptr fetchStage =
make_unique(&_txn, ws.get(), ms, nullptr, coll);
// Must fetch if we want to easily pull out an obj.
auto statusWithPlanExecutor = PlanExecutor::make(
&_txn, std::move(ws), std::move(fetchStage), coll, PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
unique_ptr exec = std::move(statusWithPlanExecutor.getValue());
for (int i = 0; i < N; ++i) {
BSONObj first, second;
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&first, NULL));
first = first.getOwned();
ASSERT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&second, NULL));
ASSERT_EQUALS(first["c"].numberInt(), second["c"].numberInt());
ASSERT_EQUALS(i, first["c"].numberInt());
// {b: 1, c: i, d: "cba"} should precede {a: 1, c: i, d: "abc"}.
ASSERT(first.hasField("b") && second.hasField("a"));
}
// Should be done now.
BSONObj foo;
ASSERT_NOT_EQUALS(PlanExecutor::ADVANCED, exec->getNext(&foo, NULL));
}
};
class All : public Suite {
public:
All() : Suite("query_stage_merge_sort_test") {}
void setupTests() {
add();
add();
add();
add();
add();
add();
add();
add();
add();
add();
}
};
SuiteInstance queryStageMergeSortTest;
} // namespace