/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/storage/kv/kv_engine_test_harness.h" #include #include #include "mongo/base/init.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h" #include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" namespace mongo { namespace { class WiredTigerKVHarnessHelper : public KVHarnessHelper { public: WiredTigerKVHarnessHelper(bool forRepair = false) : _dbpath("wt-kv-harness"), _forRepair(forRepair) { if (!hasGlobalServiceContext()) setGlobalServiceContext(ServiceContext::make()); _engine.reset(makeEngine()); repl::ReplicationCoordinator::set( getGlobalServiceContext(), std::unique_ptr(new repl::ReplicationCoordinatorMock( getGlobalServiceContext(), repl::ReplSettings()))); } virtual ~WiredTigerKVHarnessHelper() { _engine.reset(nullptr); // Cannot cleanup the global service context here, the test still have clients remaining. } virtual KVEngine* restartEngine() override { _engine.reset(nullptr); _engine.reset(makeEngine()); return _engine.get(); } virtual KVEngine* getEngine() override { return _engine.get(); } virtual WiredTigerKVEngine* getWiredTigerKVEngine() { return _engine.get(); } private: WiredTigerKVEngine* makeEngine() { return new WiredTigerKVEngine(kWiredTigerEngineName, _dbpath.path(), _cs.get(), "", 1, false, false, _forRepair, false); } const std::unique_ptr _cs = stdx::make_unique(); unittest::TempDir _dbpath; std::unique_ptr _engine; bool _forRepair; }; class WiredTigerKVEngineTest : public unittest::Test { public: void setUp() override { setGlobalServiceContext(ServiceContext::make()); Client::initThread(getThreadName()); _helper = makeHelper(); _engine = _helper->getWiredTigerKVEngine(); } void tearDown() override { _helper.reset(nullptr); Client::destroy(); setGlobalServiceContext({}); } std::unique_ptr makeOperationContext() { return std::make_unique(_engine->newRecoveryUnit()); } protected: virtual std::unique_ptr makeHelper() { return std::make_unique(); } std::unique_ptr _helper; WiredTigerKVEngine* _engine; }; class WiredTigerKVEngineRepairTest : public WiredTigerKVEngineTest { virtual std::unique_ptr makeHelper() override { return std::make_unique(true /* repair */); } }; TEST_F(WiredTigerKVEngineRepairTest, OrphanedDataFilesCanBeRecovered) { auto opCtxPtr = makeOperationContext(); std::string ns = "a.b"; std::string ident = "collection-1234"; std::string record = "abcd"; CollectionOptions options; std::unique_ptr rs; ASSERT_OK(_engine->createRecordStore(opCtxPtr.get(), ns, ident, options)); rs = _engine->getRecordStore(opCtxPtr.get(), ns, ident, options); ASSERT(rs); RecordId loc; { WriteUnitOfWork uow(opCtxPtr.get()); StatusWith res = rs->insertRecord(opCtxPtr.get(), record.c_str(), record.length() + 1, Timestamp()); ASSERT_OK(res.getStatus()); loc = res.getValue(); uow.commit(); } const boost::optional dataFilePath = _engine->getDataFilePathForIdent(ident); ASSERT(dataFilePath); ASSERT(boost::filesystem::exists(*dataFilePath)); const boost::filesystem::path tmpFile{dataFilePath->string() + ".tmp"}; ASSERT(!boost::filesystem::exists(tmpFile)); #ifdef _WIN32 auto status = _engine->recoverOrphanedIdent(opCtxPtr.get(), ns, ident, options); ASSERT_EQ(ErrorCodes::CommandNotSupported, status.code()); #else // Move the data file out of the way so the ident can be dropped. This not permitted on Windows // because the file cannot be moved while it is open. The implementation for orphan recovery is // also not implemented on Windows for this reason. boost::system::error_code err; boost::filesystem::rename(*dataFilePath, tmpFile, err); ASSERT(!err) << err.message(); ASSERT_OK(_engine->dropIdent(opCtxPtr.get(), ident)); // The data file is moved back in place so that it becomes an "orphan" of the storage // engine and the restoration process can be tested. boost::filesystem::rename(tmpFile, *dataFilePath, err); ASSERT(!err) << err.message(); auto status = _engine->recoverOrphanedIdent(opCtxPtr.get(), ns, ident, options); ASSERT_EQ(ErrorCodes::DataModifiedByRepair, status.code()); #endif } TEST_F(WiredTigerKVEngineRepairTest, UnrecoverableOrphanedDataFilesAreRebuilt) { auto opCtxPtr = makeOperationContext(); std::string ns = "a.b"; std::string ident = "collection-1234"; std::string record = "abcd"; CollectionOptions options; std::unique_ptr rs; ASSERT_OK(_engine->createRecordStore(opCtxPtr.get(), ns, ident, options)); rs = _engine->getRecordStore(opCtxPtr.get(), ns, ident, options); ASSERT(rs); RecordId loc; { WriteUnitOfWork uow(opCtxPtr.get()); StatusWith res = rs->insertRecord(opCtxPtr.get(), record.c_str(), record.length() + 1, Timestamp()); ASSERT_OK(res.getStatus()); loc = res.getValue(); uow.commit(); } const boost::optional dataFilePath = _engine->getDataFilePathForIdent(ident); ASSERT(dataFilePath); ASSERT(boost::filesystem::exists(*dataFilePath)); ASSERT_OK(_engine->dropIdent(opCtxPtr.get(), ident)); #ifdef _WIN32 auto status = _engine->recoverOrphanedIdent(opCtxPtr.get(), ns, ident, options); ASSERT_EQ(ErrorCodes::CommandNotSupported, status.code()); #else // The ident may not get immediately dropped, so ensure it is completely gone. boost::system::error_code err; boost::filesystem::remove(*dataFilePath, err); ASSERT(!err) << err.message(); // Create an empty data file. The subsequent call to recreate the collection will fail because // it is unsalvageable. boost::filesystem::ofstream fileStream(*dataFilePath); fileStream << ""; fileStream.close(); ASSERT(boost::filesystem::exists(*dataFilePath)); // This should recreate an empty data file successfully and move the old one to a name that ends // in ".corrupt". auto status = _engine->recoverOrphanedIdent(opCtxPtr.get(), ns, ident, options); ASSERT_EQ(ErrorCodes::DataModifiedByRepair, status.code()) << status.reason(); boost::filesystem::path corruptFile = (dataFilePath->string() + ".corrupt"); ASSERT(boost::filesystem::exists(corruptFile)); rs = _engine->getRecordStore(opCtxPtr.get(), ns, ident, options); RecordData data; ASSERT_FALSE(rs->findRecord(opCtxPtr.get(), loc, &data)); #endif } TEST_F(WiredTigerKVEngineTest, TestOplogTruncation) { auto opCtxPtr = makeOperationContext(); // The initial data timestamp has to be set to take stable checkpoints. The first stable // timestamp greater than this will also trigger a checkpoint. The following loop of the // CheckpointThread will observe the new `checkpointDelaySecs` value. _engine->setInitialDataTimestamp(Timestamp(1, 1)); wiredTigerGlobalOptions.checkpointDelaySecs = 1; // A method that will poll the WiredTigerKVEngine until it sees the amount of oplog necessary // for crash recovery exceeds the input. auto assertPinnedMovesSoon = [this](Timestamp newPinned) { // If the current oplog needed for rollback does not exceed the requested pinned out, we // cannot expect the CheckpointThread to eventually publish a sufficient crash recovery // value. ASSERT_TRUE(_engine->getOplogNeededForRollback() >= newPinned); // Do 100 iterations that sleep for 100 milliseconds between polls. This will wait for up // to 10 seconds to observe an asynchronous update that iterates once per second. for (auto iterations = 0; iterations < 100; ++iterations) { if (_engine->getPinnedOplog() >= newPinned) { ASSERT_TRUE(_engine->getOplogNeededForCrashRecovery().get() >= newPinned); return; } sleepmillis(100); } unittest::log() << "Expected the pinned oplog to advance. Expected value: " << newPinned << " Published value: " << _engine->getOplogNeededForCrashRecovery(); FAIL(""); }; _engine->setStableTimestamp(Timestamp(10, 1), boost::none); assertPinnedMovesSoon(Timestamp(10, 1)); _engine->setStableTimestamp(Timestamp(20, 1), Timestamp(15, 1)); assertPinnedMovesSoon(Timestamp(15, 1)); _engine->setStableTimestamp(Timestamp(30, 1), Timestamp(19, 1)); assertPinnedMovesSoon(Timestamp(19, 1)); _engine->setStableTimestamp(Timestamp(30, 1), boost::none); assertPinnedMovesSoon(Timestamp(30, 1)); } std::unique_ptr makeHelper() { return stdx::make_unique(); } MONGO_INITIALIZER(RegisterKVHarnessFactory)(InitializerContext*) { KVHarnessHelper::registerFactory(makeHelper); return Status::OK(); } } // namespace } // namespace mongo