/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include
#include
#include
#include
#include "mongo/base/checked_cast.h"
#include "mongo/base/init.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/json.h"
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/kv/kv_engine_test_harness.h"
#include "mongo/db/storage/kv/kv_prefix.h"
#include "mongo/db/storage/record_store_test_harness.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/clock_source_mock.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
namespace {
using std::unique_ptr;
using std::string;
using std::stringstream;
class PrefixedWiredTigerHarnessHelper final : public RecordStoreHarnessHelper {
public:
PrefixedWiredTigerHarnessHelper()
: _dbpath("wt_test"),
_engine(new WiredTigerKVEngine(kWiredTigerEngineName,
_dbpath.path(),
_cs.get(),
"",
1,
false,
false,
false,
false)) {
repl::ReplicationCoordinator::set(
getGlobalServiceContext(),
std::unique_ptr(new repl::ReplicationCoordinatorMock(
getGlobalServiceContext(), repl::ReplSettings())));
}
PrefixedWiredTigerHarnessHelper(StringData extraStrings) : _dbpath("wt_test") {}
virtual std::unique_ptr newNonCappedRecordStore() {
return newNonCappedRecordStore("a.b");
}
virtual std::unique_ptr newNonCappedRecordStore(const std::string& ns) {
WiredTigerRecoveryUnit* ru =
checked_cast(_engine->newRecoveryUnit());
OperationContextNoop opCtx(ru);
string uri = "table:" + ns;
const bool prefixed = true;
StatusWith result = WiredTigerRecordStore::generateCreateString(
kWiredTigerEngineName, ns, CollectionOptions(), "", prefixed);
ASSERT_TRUE(result.isOK());
std::string config = result.getValue();
{
WriteUnitOfWork uow(&opCtx);
WT_SESSION* s = ru->getSession()->getSession();
invariantWTOK(s->create(s, uri.c_str(), config.c_str()));
uow.commit();
}
WiredTigerRecordStore::Params params;
params.ns = ns;
params.uri = uri;
params.engineName = kWiredTigerEngineName;
params.isCapped = false;
params.isEphemeral = false;
params.cappedMaxSize = -1;
params.cappedMaxDocs = -1;
params.cappedCallback = nullptr;
params.sizeStorer = nullptr;
auto ret = stdx::make_unique(
_engine.get(), &opCtx, params, KVPrefix::generateNextPrefix());
ret->postConstructorInit(&opCtx);
return std::move(ret);
}
virtual std::unique_ptr newCappedRecordStore(int64_t cappedSizeBytes,
int64_t cappedMaxDocs) final {
return newCappedRecordStore("a.b", cappedSizeBytes, cappedMaxDocs);
}
virtual std::unique_ptr newCappedRecordStore(const std::string& ns,
int64_t cappedMaxSize,
int64_t cappedMaxDocs) {
WiredTigerRecoveryUnit* ru =
checked_cast(_engine->newRecoveryUnit());
OperationContextNoop opCtx(ru);
string uri = "table:a.b";
CollectionOptions options;
options.capped = true;
KVPrefix prefix = KVPrefix::generateNextPrefix();
StatusWith result = WiredTigerRecordStore::generateCreateString(
kWiredTigerEngineName, ns, options, "", prefix.isPrefixed());
ASSERT_TRUE(result.isOK());
std::string config = result.getValue();
{
WriteUnitOfWork uow(&opCtx);
WT_SESSION* s = ru->getSession()->getSession();
invariantWTOK(s->create(s, uri.c_str(), config.c_str()));
uow.commit();
}
WiredTigerRecordStore::Params params;
params.ns = ns;
params.uri = uri;
params.engineName = kWiredTigerEngineName;
params.isCapped = true;
params.isEphemeral = false;
params.cappedMaxSize = cappedMaxSize;
params.cappedMaxDocs = cappedMaxDocs;
params.cappedCallback = nullptr;
params.sizeStorer = nullptr;
auto ret =
stdx::make_unique(_engine.get(), &opCtx, params, prefix);
ret->postConstructorInit(&opCtx);
return std::move(ret);
}
virtual std::unique_ptr newRecoveryUnit() final {
return std::unique_ptr(
checked_cast(_engine->newRecoveryUnit()));
}
virtual bool supportsDocLocking() final {
return true;
}
virtual WT_CONNECTION* conn() const {
return _engine->getConnection();
}
private:
unittest::TempDir _dbpath;
const std::unique_ptr _cs = stdx::make_unique();
std::unique_ptr _engine;
};
std::unique_ptr makeHarnessHelper() {
return stdx::make_unique();
}
MONGO_INITIALIZER(RegisterHarnessFactory)(InitializerContext* const) {
mongo::registerHarnessHelperFactory(makeHarnessHelper);
return Status::OK();
}
TEST(WiredTigerRecordStoreTest, PrefixedTableScan) {
unique_ptr harnessHelper = newRecordStoreHarnessHelper();
unique_ptr rs = harnessHelper->newNonCappedRecordStore("a.b");
const int numDocs = 1000;
{ // Insert documents.
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
for (int num = 0; num < numDocs; ++num) {
WriteUnitOfWork uow(opCtx.get());
StatusWith res = rs->insertRecord(opCtx.get(), "a", 2, Timestamp(), false);
ASSERT_OK(res.getStatus());
uow.commit();
}
}
auto client = harnessHelper->serviceContext()->makeClient("client");
auto cursorCtx = harnessHelper->newOperationContext(client.get());
auto cursor = rs->getCursor(cursorCtx.get());
for (int num = 0; num < numDocs; ++num) {
ASSERT(cursor->next());
}
ASSERT(!cursor->next());
}
TEST(WiredTigerRecordStoreTest, PrefixedSeekingCursor) {
unique_ptr harnessHelper = newRecordStoreHarnessHelper();
unique_ptr rs = harnessHelper->newNonCappedRecordStore("a.b");
RecordId startRecordId;
const int numDocs = 1000;
{ // Insert documents.
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
for (int num = 0; num < numDocs; ++num) {
WriteUnitOfWork uow(opCtx.get());
StatusWith res = rs->insertRecord(opCtx.get(), "a", 2, Timestamp(), false);
if (startRecordId.isNull()) {
startRecordId = res.getValue();
}
ASSERT_OK(res.getStatus());
uow.commit();
}
}
auto client = harnessHelper->serviceContext()->makeClient("client");
auto cursorCtx = harnessHelper->newOperationContext(client.get());
auto cursor = rs->getCursor(cursorCtx.get());
for (int num = 0; num < numDocs; ++num) {
ASSERT(cursor->seekExact(RecordId(startRecordId.repr() + num)));
}
ASSERT(!cursor->seekExact(RecordId(startRecordId.repr() + numDocs)));
}
} // namespace
} // namespace mongo