/**
* Copyright (C) 2013-2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
/**
* This file tests PlanExecutor forced yielding, ClientCursor::registerExecutor, and
* ClientCursor::deregisterExecutor.
*/
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/plan_stage.h"
#include "mongo/db/service_context.h"
#include "mongo/db/json.h"
#include "mongo/db/matcher/expression_parser.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/dbtests/dbtests.h"
namespace ExecutorRegistry {
using std::unique_ptr;
static const NamespaceString nss("unittests.ExecutorRegistryDiskLocInvalidation");
class ExecutorRegistryBase {
public:
ExecutorRegistryBase() : _client(&_opCtx) {
_ctx.reset(new OldClientWriteContext(&_opCtx, nss.ns()));
_client.dropCollection(nss.ns());
for (int i = 0; i < N(); ++i) {
_client.insert(nss.ns(), BSON("foo" << i));
}
}
/**
* Return a plan executor that is going over the collection in nss.ns().
*/
PlanExecutor* getCollscan() {
unique_ptr ws(new WorkingSet());
CollectionScanParams params;
params.collection = collection();
params.direction = CollectionScanParams::FORWARD;
params.tailable = false;
unique_ptr scan(new CollectionScan(&_opCtx, params, ws.get(), NULL));
// Create a plan executor to hold it
auto statusWithCQ = CanonicalQuery::canonicalize(nss, BSONObj());
ASSERT_OK(statusWithCQ.getStatus());
std::unique_ptr cq = std::move(statusWithCQ.getValue());
// Takes ownership of 'ws', 'scan', and 'cq'.
auto statusWithPlanExecutor = PlanExecutor::make(&_opCtx,
std::move(ws),
std::move(scan),
std::move(cq),
_ctx->db()->getCollection(nss.ns()),
PlanExecutor::YIELD_MANUAL);
ASSERT_OK(statusWithPlanExecutor.getStatus());
return statusWithPlanExecutor.getValue().release();
}
void registerExecutor(PlanExecutor* exec) {
WriteUnitOfWork wuow(&_opCtx);
_ctx->db()
->getOrCreateCollection(&_opCtx, nss.ns())
->getCursorManager()
->registerExecutor(exec);
wuow.commit();
}
void deregisterExecutor(PlanExecutor* exec) {
WriteUnitOfWork wuow(&_opCtx);
_ctx->db()
->getOrCreateCollection(&_opCtx, nss.ns())
->getCursorManager()
->deregisterExecutor(exec);
wuow.commit();
}
int N() {
return 50;
}
Collection* collection() {
return _ctx->db()->getCollection(nss.ns());
}
// Order of these is important for initialization
OperationContextImpl _opCtx;
unique_ptr _ctx;
DBDirectClient _client;
};
// Test that a registered runner receives invalidation notifications.
class ExecutorRegistryDiskLocInvalid : public ExecutorRegistryBase {
public:
void run() {
if (supportsDocLocking()) {
return;
}
unique_ptr run(getCollscan());
BSONObj obj;
// Read some of it.
for (int i = 0; i < 10; ++i) {
ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL));
ASSERT_EQUALS(i, obj["foo"].numberInt());
}
// Register it.
run->saveState();
registerExecutor(run.get());
// At this point it's safe to yield. forceYield would do that. Let's now simulate some
// stuff going on in the yield.
// Delete some data, namely the next 2 things we'd expect.
_client.remove(nss.ns(), BSON("foo" << 10));
_client.remove(nss.ns(), BSON("foo" << 11));
// At this point, we're done yielding. We recover our lock.
// Unregister the runner.
deregisterExecutor(run.get());
// And clean up anything that happened before.
run->restoreState();
// Make sure that the runner moved forward over the deleted data. We don't see foo==10
// or foo==11.
for (int i = 12; i < N(); ++i) {
ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL));
ASSERT_EQUALS(i, obj["foo"].numberInt());
}
ASSERT_EQUALS(PlanExecutor::IS_EOF, run->getNext(&obj, NULL));
}
};
// Test that registered runners are killed when their collection is dropped.
class ExecutorRegistryDropCollection : public ExecutorRegistryBase {
public:
void run() {
unique_ptr run(getCollscan());
BSONObj obj;
// Read some of it.
for (int i = 0; i < 10; ++i) {
ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL));
ASSERT_EQUALS(i, obj["foo"].numberInt());
}
// Save state and register.
run->saveState();
registerExecutor(run.get());
// Drop a collection that's not ours.
_client.dropCollection("unittests.someboguscollection");
// Unregister and restore state.
deregisterExecutor(run.get());
run->restoreState();
ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL));
ASSERT_EQUALS(10, obj["foo"].numberInt());
// Save state and register.
run->saveState();
registerExecutor(run.get());
// Drop our collection.
_client.dropCollection(nss.ns());
// Unregister and restore state.
deregisterExecutor(run.get());
run->restoreState();
// PlanExecutor was killed.
ASSERT_EQUALS(PlanExecutor::DEAD, run->getNext(&obj, NULL));
}
};
// Test that registered runners are killed when all indices are dropped on the collection.
class ExecutorRegistryDropAllIndices : public ExecutorRegistryBase {
public:
void run() {
unique_ptr run(getCollscan());
BSONObj obj;
ASSERT_OK(dbtests::createIndex(&_opCtx, nss.ns(), BSON("foo" << 1)));
// Read some of it.
for (int i = 0; i < 10; ++i) {
ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL));
ASSERT_EQUALS(i, obj["foo"].numberInt());
}
// Save state and register.
run->saveState();
registerExecutor(run.get());
// Drop all indices.
_client.dropIndexes(nss.ns());
// Unregister and restore state.
deregisterExecutor(run.get());
run->restoreState();
// PlanExecutor was killed.
ASSERT_EQUALS(PlanExecutor::DEAD, run->getNext(&obj, NULL));
}
};
// Test that registered runners are killed when an index is dropped on the collection.
class ExecutorRegistryDropOneIndex : public ExecutorRegistryBase {
public:
void run() {
unique_ptr run(getCollscan());
BSONObj obj;
ASSERT_OK(dbtests::createIndex(&_opCtx, nss.ns(), BSON("foo" << 1)));
// Read some of it.
for (int i = 0; i < 10; ++i) {
ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL));
ASSERT_EQUALS(i, obj["foo"].numberInt());
}
// Save state and register.
run->saveState();
registerExecutor(run.get());
// Drop a specific index.
_client.dropIndex(nss.ns(), BSON("foo" << 1));
// Unregister and restore state.
deregisterExecutor(run.get());
run->restoreState();
// PlanExecutor was killed.
ASSERT_EQUALS(PlanExecutor::DEAD, run->getNext(&obj, NULL));
}
};
// Test that registered runners are killed when their database is dropped.
class ExecutorRegistryDropDatabase : public ExecutorRegistryBase {
public:
void run() {
unique_ptr run(getCollscan());
BSONObj obj;
// Read some of it.
for (int i = 0; i < 10; ++i) {
ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL));
ASSERT_EQUALS(i, obj["foo"].numberInt());
}
// Save state and register.
run->saveState();
registerExecutor(run.get());
// Drop a DB that's not ours. We can't have a lock at all to do this as dropping a DB
// requires a "global write lock."
_ctx.reset();
_client.dropDatabase("somesillydb");
_ctx.reset(new OldClientWriteContext(&_opCtx, nss.ns()));
// Unregister and restore state.
deregisterExecutor(run.get());
run->restoreState();
ASSERT_EQUALS(PlanExecutor::ADVANCED, run->getNext(&obj, NULL));
ASSERT_EQUALS(10, obj["foo"].numberInt());
// Save state and register.
run->saveState();
registerExecutor(run.get());
// Drop our DB. Once again, must give up the lock.
_ctx.reset();
_client.dropDatabase("unittests");
_ctx.reset(new OldClientWriteContext(&_opCtx, nss.ns()));
// Unregister and restore state.
deregisterExecutor(run.get());
run->restoreState();
_ctx.reset();
// PlanExecutor was killed.
ASSERT_EQUALS(PlanExecutor::DEAD, run->getNext(&obj, NULL));
}
};
// TODO: Test that this works with renaming a collection.
class All : public Suite {
public:
All() : Suite("executor_registry") {}
void setupTests() {
add();
add();
add();
add();
add();
}
};
SuiteInstance executorRegistryAll;
} // namespace ExecutorRegistry