// documentsourcetests.cpp : Unit tests for DocumentSource classes.
/**
* Copyright (C) 2012-2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/matcher/extensions_callback_disallow_extensions.h"
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/dbtests/dbtests.h"
namespace DocumentSourceCursorTests {
using boost::intrusive_ptr;
using std::unique_ptr;
using std::vector;
static const NamespaceString nss("unittests.documentsourcetests");
static const BSONObj metaTextScore = BSON("$meta"
<< "textScore");
BSONObj toBson(const intrusive_ptr& source) {
vector arr;
source->serializeToArray(arr);
ASSERT_EQUALS(arr.size(), 1UL);
return arr[0].getDocument().toBson();
}
class CollectionBase {
public:
CollectionBase() : client(&_opCtx) {}
~CollectionBase() {
client.dropCollection(nss.ns());
}
protected:
const ServiceContext::UniqueOperationContext _opCtxPtr = cc().makeOperationContext();
OperationContext& _opCtx = *_opCtxPtr;
DBDirectClient client;
};
namespace DocumentSourceCursor {
using mongo::DocumentSourceCursor;
class Base : public CollectionBase {
public:
Base() : _ctx(new ExpressionContext(&_opCtx, nss)) {
_ctx->tempDir = storageGlobalParams.dbpath + "/_tmp";
}
protected:
void createSource() {
// clean up first if this was called before
_source.reset();
_exec.reset();
OldClientWriteContext ctx(&_opCtx, nss.ns());
auto cq = uassertStatusOK(CanonicalQuery::canonicalize(
nss, /*query=*/BSONObj(), ExtensionsCallbackDisallowExtensions()));
_exec = uassertStatusOK(
getExecutor(&_opCtx, ctx.getCollection(), std::move(cq), PlanExecutor::YIELD_MANUAL));
_exec->saveState();
_exec->registerExec();
_source = DocumentSourceCursor::create(nss.ns(), _exec, _ctx);
}
intrusive_ptr ctx() {
return _ctx;
}
DocumentSourceCursor* source() {
return _source.get();
}
private:
// It is important that these are ordered to ensure correct destruction order.
std::shared_ptr _exec;
intrusive_ptr _ctx;
intrusive_ptr _source;
};
/** Create a DocumentSourceCursor. */
class Empty : public Base {
public:
void run() {
createSource();
// The DocumentSourceCursor doesn't hold a read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
// The collection is empty, so the source produces no results.
ASSERT(!source()->getNext());
// Exhausting the source releases the read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
}
};
/** Iterate a DocumentSourceCursor. */
class Iterate : public Base {
public:
void run() {
client.insert(nss.ns(), BSON("a" << 1));
createSource();
// The DocumentSourceCursor doesn't hold a read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
// The cursor will produce the expected result.
boost::optional next = source()->getNext();
ASSERT(bool(next));
ASSERT_EQUALS(Value(1), next->getField("a"));
// There are no more results.
ASSERT(!source()->getNext());
// Exhausting the source releases the read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
}
};
/** Dispose of a DocumentSourceCursor. */
class Dispose : public Base {
public:
void run() {
createSource();
// The DocumentSourceCursor doesn't hold a read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
source()->dispose();
// Releasing the cursor releases the read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
// The source is marked as exhausted.
ASSERT(!source()->getNext());
}
};
/** Iterate a DocumentSourceCursor and then dispose of it. */
class IterateDispose : public Base {
public:
void run() {
client.insert(nss.ns(), BSON("a" << 1));
client.insert(nss.ns(), BSON("a" << 2));
client.insert(nss.ns(), BSON("a" << 3));
createSource();
// The result is as expected.
boost::optional next = source()->getNext();
ASSERT(bool(next));
ASSERT_EQUALS(Value(1), next->getField("a"));
// The next result is as expected.
next = source()->getNext();
ASSERT(bool(next));
ASSERT_EQUALS(Value(2), next->getField("a"));
// The DocumentSourceCursor doesn't hold a read lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
source()->dispose();
// Disposing of the source releases the lock.
ASSERT(!_opCtx.lockState()->isReadLocked());
// The source cannot be advanced further.
ASSERT(!source()->getNext());
}
};
/** Set a value or await an expected value. */
class PendingValue {
public:
PendingValue(int initialValue) : _value(initialValue) {}
void set(int newValue) {
stdx::lock_guard lk(_mutex);
_value = newValue;
_condition.notify_all();
}
void await(int expectedValue) const {
stdx::unique_lock lk(_mutex);
while (_value != expectedValue) {
_condition.wait(lk);
}
}
private:
int _value;
mutable stdx::mutex _mutex;
mutable stdx::condition_variable _condition;
};
/** Test coalescing a limit into a cursor */
class LimitCoalesce : public Base {
public:
intrusive_ptr mkLimit(long long limit) {
return DocumentSourceLimit::create(ctx(), limit);
}
void run() {
client.insert(nss.ns(), BSON("a" << 1));
client.insert(nss.ns(), BSON("a" << 2));
client.insert(nss.ns(), BSON("a" << 3));
createSource();
Pipeline::SourceContainer container;
container.push_back(source());
container.push_back(mkLimit(10));
source()->optimizeAt(container.begin(), &container);
// initial limit becomes limit of cursor
ASSERT_EQUALS(container.size(), 1U);
ASSERT_EQUALS(source()->getLimit(), 10);
container.push_back(mkLimit(2));
source()->optimizeAt(container.begin(), &container);
// smaller limit lowers cursor limit
ASSERT_EQUALS(container.size(), 1U);
ASSERT_EQUALS(source()->getLimit(), 2);
container.push_back(mkLimit(3));
source()->optimizeAt(container.begin(), &container);
// higher limit doesn't effect cursor limit
ASSERT_EQUALS(container.size(), 1U);
ASSERT_EQUALS(source()->getLimit(), 2);
// The cursor allows exactly 2 documents through
ASSERT(bool(source()->getNext()));
ASSERT(bool(source()->getNext()));
ASSERT(!source()->getNext());
}
};
} // namespace DocumentSourceCursor
class All : public Suite {
public:
All() : Suite("documentsource") {}
void setupTests() {
add();
add();
add();
add();
add();
}
};
SuiteInstance myall;
} // namespace DocumentSourceCursorTests