/**
* Copyright (C) 2013-2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
/**
* This file tests db/exec/fetch.cpp. Fetch goes to disk so we cannot test outside of a dbtest.
*/
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/exec/queued_data_stage.h"
#include "mongo/db/json.h"
#include "mongo/db/matcher/expression_parser.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/dbtests/dbtests.h"
namespace QueryStageFetch {
using std::shared_ptr;
using std::unique_ptr;
using std::set;
class QueryStageFetchBase {
public:
QueryStageFetchBase() : _client(&_txn) {
}
virtual ~QueryStageFetchBase() {
_client.dropCollection(ns());
}
void getLocs(set* out, Collection* coll) {
auto cursor = coll->getCursor(&_txn);
while (auto record = cursor->next()) {
out->insert(record->id);
}
}
void insert(const BSONObj& obj) {
_client.insert(ns(), obj);
}
void remove(const BSONObj& obj) {
_client.remove(ns(), obj);
}
static const char* ns() { return "unittests.QueryStageFetch"; }
protected:
OperationContextImpl _txn;
DBDirectClient _client;
};
//
// Test that a WSM with an obj is passed through verbatim.
//
class FetchStageAlreadyFetched : public QueryStageFetchBase {
public:
void run() {
OldClientWriteContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
WorkingSet ws;
// Add an object to the DB.
insert(BSON("foo" << 5));
set locs;
getLocs(&locs, coll);
ASSERT_EQUALS(size_t(1), locs.size());
// Create a mock stage that returns the WSM.
unique_ptr mockStage(new QueuedDataStage(&ws));
// Mock data.
{
WorkingSetMember mockMember;
mockMember.state = WorkingSetMember::LOC_AND_UNOWNED_OBJ;
mockMember.loc = *locs.begin();
mockMember.obj = coll->docFor(&_txn, mockMember.loc);
// Points into our DB.
mockStage->pushBack(mockMember);
mockMember.state = WorkingSetMember::OWNED_OBJ;
mockMember.loc = RecordId();
mockMember.obj = Snapshotted(SnapshotId(), BSON("foo" << 6));
ASSERT_TRUE(mockMember.obj.value().isOwned());
mockStage->pushBack(mockMember);
}
unique_ptr fetchStage(new FetchStage(&_txn, &ws, mockStage.release(),
NULL, coll));
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState state;
// Don't bother doing any fetching if an obj exists already.
state = fetchStage->work(&id);
ASSERT_EQUALS(PlanStage::ADVANCED, state);
state = fetchStage->work(&id);
ASSERT_EQUALS(PlanStage::ADVANCED, state);
// No more data to fetch, so, EOF.
state = fetchStage->work(&id);
ASSERT_EQUALS(PlanStage::IS_EOF, state);
}
};
//
// Test matching with fetch.
//
class FetchStageFilter : public QueryStageFetchBase {
public:
void run() {
ScopedTransaction transaction(&_txn, MODE_IX);
Lock::DBLock lk(_txn.lockState(), nsToDatabaseSubstring(ns()), MODE_X);
OldClientContext ctx(&_txn, ns());
Database* db = ctx.db();
Collection* coll = db->getCollection(ns());
if (!coll) {
WriteUnitOfWork wuow(&_txn);
coll = db->createCollection(&_txn, ns());
wuow.commit();
}
WorkingSet ws;
// Add an object to the DB.
insert(BSON("foo" << 5));
set locs;
getLocs(&locs, coll);
ASSERT_EQUALS(size_t(1), locs.size());
// Create a mock stage that returns the WSM.
unique_ptr mockStage(new QueuedDataStage(&ws));
// Mock data.
{
WorkingSetMember mockMember;
mockMember.state = WorkingSetMember::LOC_AND_IDX;
mockMember.loc = *locs.begin();
// State is loc and index, shouldn't be able to get the foo data inside.
BSONElement elt;
ASSERT_FALSE(mockMember.getFieldDotted("foo", &elt));
mockStage->pushBack(mockMember);
}
// Make the filter.
BSONObj filterObj = BSON("foo" << 6);
StatusWithMatchExpression swme = MatchExpressionParser::parse(filterObj);
verify(swme.isOK());
unique_ptr filterExpr(swme.getValue());
// Matcher requires that foo==6 but we only have data with foo==5.
unique_ptr fetchStage(
new FetchStage(&_txn, &ws, mockStage.release(), filterExpr.get(), coll));
// First call should return a fetch request as it's not in memory.
WorkingSetID id = WorkingSet::INVALID_ID;
PlanStage::StageState state;
// Normally we'd return the object but we have a filter that prevents it.
state = fetchStage->work(&id);
ASSERT_EQUALS(PlanStage::NEED_TIME, state);
// No more data to fetch, so, EOF.
state = fetchStage->work(&id);
ASSERT_EQUALS(PlanStage::IS_EOF, state);
}
};
class All : public Suite {
public:
All() : Suite( "query_stage_fetch" ) { }
void setupTests() {
add();
add();
}
};
SuiteInstance queryStageFetchAll;
} // namespace QueryStageFetch