// documentsourcetests.cpp : Unit tests for DocumentSource classes.
/**
* Copyright (C) 2012-2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/exec/multi_plan.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/matcher/extensions_callback_disallow_extensions.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/query/stage_builder.h"
#include "mongo/dbtests/dbtests.h"
namespace DocumentSourceCursorTests {
using boost::intrusive_ptr;
using std::unique_ptr;
using std::vector;
static const NamespaceString nss("unittests.documentsourcetests");
static const BSONObj metaTextScore = BSON("$meta"
<< "textScore");
BSONObj toBson(const intrusive_ptr& source) {
vector arr;
source->serializeToArray(arr);
ASSERT_EQUALS(arr.size(), 1UL);
return arr[0].getDocument().toBson();
}
class CollectionBase {
public:
CollectionBase() : client(&_opCtx) {}
~CollectionBase() {
client.dropCollection(nss.ns());
}
protected:
const ServiceContext::UniqueOperationContext _opCtxPtr = cc().makeOperationContext();
OperationContext& _opCtx = *_opCtxPtr;
DBDirectClient client;
};
namespace DocumentSourceCursor {
using mongo::DocumentSourceCursor;
class Base : public CollectionBase {
public:
Base() : _ctx(new ExpressionContextForTest(&_opCtx, AggregationRequest(nss, {}))) {
_ctx->tempDir = storageGlobalParams.dbpath + "/_tmp";
}
protected:
void createSource(boost::optional hint = boost::none) {
// clean up first if this was called before
_source.reset();
OldClientWriteContext ctx(&_opCtx, nss.ns());
auto qr = stdx::make_unique(nss);
if (hint) {
qr->setHint(*hint);
}
auto cq = uassertStatusOK(CanonicalQuery::canonicalize(
&_opCtx, std::move(qr), ExtensionsCallbackDisallowExtensions()));
auto exec = uassertStatusOK(
getExecutor(&_opCtx, ctx.getCollection(), std::move(cq), PlanExecutor::NO_YIELD));
exec->saveState();
_source = DocumentSourceCursor::create(ctx.getCollection(), std::move(exec), _ctx);
}
intrusive_ptr ctx() {
return _ctx;
}
DocumentSourceCursor* source() {
return _source.get();
}
private:
// It is important that these are ordered to ensure correct destruction order.
intrusive_ptr _ctx;
intrusive_ptr _source;
};
/** Create a DocumentSourceCursor. */
class Empty : public Base {
public:
void run() {
createSource();
// The DocumentSourceCursor doesn't hold a read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
// The collection is empty, so the source produces no results.
ASSERT(source()->getNext().isEOF());
// Exhausting the source releases the read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
}
};
/** Iterate a DocumentSourceCursor. */
class Iterate : public Base {
public:
void run() {
client.insert(nss.ns(), BSON("a" << 1));
createSource();
// The DocumentSourceCursor doesn't hold a read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
// The cursor will produce the expected result.
auto next = source()->getNext();
ASSERT(next.isAdvanced());
ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a"));
// There are no more results.
ASSERT(source()->getNext().isEOF());
// Exhausting the source releases the read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
}
};
/** Dispose of a DocumentSourceCursor. */
class Dispose : public Base {
public:
void run() {
createSource();
// The DocumentSourceCursor doesn't hold a read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
source()->dispose();
// Releasing the cursor releases the read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
// The source is marked as exhausted.
ASSERT(source()->getNext().isEOF());
}
};
/** Iterate a DocumentSourceCursor and then dispose of it. */
class IterateDispose : public Base {
public:
void run() {
client.insert(nss.ns(), BSON("a" << 1));
client.insert(nss.ns(), BSON("a" << 2));
client.insert(nss.ns(), BSON("a" << 3));
createSource();
// The result is as expected.
auto next = source()->getNext();
ASSERT(next.isAdvanced());
ASSERT_VALUE_EQ(Value(1), next.getDocument().getField("a"));
// The next result is as expected.
next = source()->getNext();
ASSERT(next.isAdvanced());
ASSERT_VALUE_EQ(Value(2), next.getDocument().getField("a"));
// The DocumentSourceCursor doesn't hold a read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
source()->dispose();
// Disposing of the source releases the lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
// The source cannot be advanced further.
ASSERT(source()->getNext().isEOF());
}
};
/** Set a value or await an expected value. */
class PendingValue {
public:
PendingValue(int initialValue) : _value(initialValue) {}
void set(int newValue) {
stdx::lock_guard lk(_mutex);
_value = newValue;
_condition.notify_all();
}
void await(int expectedValue) const {
stdx::unique_lock lk(_mutex);
while (_value != expectedValue) {
_condition.wait(lk);
}
}
private:
int _value;
mutable stdx::mutex _mutex;
mutable stdx::condition_variable _condition;
};
/** Test coalescing a limit into a cursor */
class LimitCoalesce : public Base {
public:
intrusive_ptr mkLimit(long long limit) {
return DocumentSourceLimit::create(ctx(), limit);
}
void run() {
client.insert(nss.ns(), BSON("a" << 1));
client.insert(nss.ns(), BSON("a" << 2));
client.insert(nss.ns(), BSON("a" << 3));
createSource();
Pipeline::SourceContainer container;
container.push_back(source());
container.push_back(mkLimit(10));
source()->optimizeAt(container.begin(), &container);
// initial limit becomes limit of cursor
ASSERT_EQUALS(container.size(), 1U);
ASSERT_EQUALS(source()->getLimit(), 10);
container.push_back(mkLimit(2));
source()->optimizeAt(container.begin(), &container);
// smaller limit lowers cursor limit
ASSERT_EQUALS(container.size(), 1U);
ASSERT_EQUALS(source()->getLimit(), 2);
container.push_back(mkLimit(3));
source()->optimizeAt(container.begin(), &container);
// higher limit doesn't effect cursor limit
ASSERT_EQUALS(container.size(), 1U);
ASSERT_EQUALS(source()->getLimit(), 2);
// The cursor allows exactly 2 documents through
ASSERT(source()->getNext().isAdvanced());
ASSERT(source()->getNext().isAdvanced());
ASSERT(source()->getNext().isEOF());
}
};
//
// Test cursor output sort.
//
class CollectionScanProvidesNoSort : public Base {
public:
void run() {
createSource(BSON("$natural" << 1));
ASSERT_EQ(source()->getOutputSorts().size(), 0U);
source()->dispose();
}
};
class IndexScanProvidesSortOnKeys : public Base {
public:
void run() {
client.createIndex(nss.ns(), BSON("a" << 1));
createSource(BSON("a" << 1));
ASSERT_EQ(source()->getOutputSorts().size(), 1U);
ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U);
source()->dispose();
}
};
class ReverseIndexScanProvidesSort : public Base {
public:
void run() {
client.createIndex(nss.ns(), BSON("a" << -1));
createSource(BSON("a" << -1));
ASSERT_EQ(source()->getOutputSorts().size(), 1U);
ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << -1)), 1U);
source()->dispose();
}
};
class CompoundIndexScanProvidesMultipleSorts : public Base {
public:
void run() {
client.createIndex(nss.ns(), BSON("a" << 1 << "b" << -1));
createSource(BSON("a" << 1 << "b" << -1));
ASSERT_EQ(source()->getOutputSorts().size(), 2U);
ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1)), 1U);
ASSERT_EQ(source()->getOutputSorts().count(BSON("a" << 1 << "b" << -1)), 1U);
source()->dispose();
}
};
class SerializationRespectsExplainModes : public Base {
public:
void run() {
createSource();
{
// Nothing serialized when no explain mode specified.
auto explainResult = source()->serialize();
ASSERT_TRUE(explainResult.missing());
}
{
auto explainResult = source()->serialize(ExplainOptions::Verbosity::kQueryPlanner);
ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing());
ASSERT_TRUE(explainResult["$cursor"]["executionStats"].missing());
}
{
auto explainResult = source()->serialize(ExplainOptions::Verbosity::kExecStats);
ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing());
ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing());
ASSERT_TRUE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing());
}
{
auto explainResult =
source()->serialize(ExplainOptions::Verbosity::kExecAllPlans).getDocument();
ASSERT_FALSE(explainResult["$cursor"]["queryPlanner"].missing());
ASSERT_FALSE(explainResult["$cursor"]["executionStats"].missing());
ASSERT_FALSE(explainResult["$cursor"]["executionStats"]["allPlansExecution"].missing());
}
source()->dispose();
}
};
} // namespace DocumentSourceCursor
class All : public Suite {
public:
All() : Suite("documentsource") {}
void setupTests() {
add();
add();
add();
add();
add();
add();
add();
add();
add();
add();
}
};
SuiteInstance myall;
} // namespace DocumentSourceCursorTests