// @file dur_recover.cpp crash recovery via the journal /** * Copyright (C) 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kJournal #include "mongo/platform/basic.h" #include "mongo/db/storage/mmap_v1/dur_recover.h" #include #include #include #include #include #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/db/storage/mmap_v1/compress.h" #include "mongo/db/storage/mmap_v1/dur_commitjob.h" #include "mongo/db/storage/mmap_v1/dur_journal.h" #include "mongo/db/storage/mmap_v1/dur_journalformat.h" #include "mongo/db/storage/mmap_v1/dur_stats.h" #include "mongo/db/storage/mmap_v1/durable_mapped_file.h" #include "mongo/db/storage/mmap_v1/durop.h" #include "mongo/db/storage/mmap_v1/mmap_v1_options.h" #include "mongo/platform/strnlen.h" #include "mongo/util/bufreader.h" #include "mongo/util/checksum.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/exit.h" #include "mongo/util/hex.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" #include "mongo/util/startup_test.h" namespace mongo { using std::shared_ptr; using std::unique_ptr; using std::endl; using std::hex; using std::map; using std::pair; using std::setw; using std::string; using std::stringstream; using std::vector; /** * Thrown when a journal section is corrupt. This is considered OK as long as it occurs while * processing the last file. Processing stops at the first corrupt section. * * Any logging about the nature of the corruption should happen before throwing as this class * contains no data. */ class JournalSectionCorruptException {}; namespace dur { // The singleton recovery job object RecoveryJob& RecoveryJob::_instance = *(new RecoveryJob()); void removeJournalFiles(); boost::filesystem::path getJournalDir(); struct ParsedJournalEntry { /*copyable*/ ParsedJournalEntry() : e(0) {} // relative path of database for the operation. // might be a pointer into mmaped Journal file const char* dbName; // those are pointers into the memory mapped journal file const JEntry* e; // local db sentinel is already parsed out here into dbName // if not one of the two simple JEntry's above, this is the operation: std::shared_ptr op; }; /** * Get journal filenames, in order. Throws if unexpected content found. */ static void getFiles(boost::filesystem::path dir, vector& files) { map m; for (boost::filesystem::directory_iterator i(dir); i != boost::filesystem::directory_iterator(); ++i) { boost::filesystem::path filepath = *i; string fileName = boost::filesystem::path(*i).leaf().string(); if (str::startsWith(fileName, "j._")) { unsigned u = str::toUnsigned(str::after(fileName, '_')); if (m.count(u)) { uasserted(13531, str::stream() << "unexpected files in journal directory " << dir.string() << " : " << fileName); } m.insert(pair(u, filepath)); } } for (map::iterator i = m.begin(); i != m.end(); ++i) { if (i != m.begin() && m.count(i->first - 1) == 0) { uasserted(13532, str::stream() << "unexpected file in journal directory " << dir.string() << " : " << boost::filesystem::path(i->second).leaf().string() << " : can't find its preceding file"); } files.push_back(i->second); } } /** read through the memory mapped data of a journal file (journal/j._ file) throws */ class JournalSectionIterator { MONGO_DISALLOW_COPYING(JournalSectionIterator); public: JournalSectionIterator(const JSectHeader& h, const void* compressed, unsigned compressedLen, bool doDurOpsRecovering) : _h(h), _lastDbName(0), _doDurOps(doDurOpsRecovering) { verify(doDurOpsRecovering); if (!uncompress((const char*)compressed, compressedLen, &_uncompressed)) { // We check the checksum before we uncompress, but this may still fail as the // checksum isn't foolproof. log() << "couldn't uncompress journal section" << endl; throw JournalSectionCorruptException(); } const char* p = _uncompressed.c_str(); verify(compressedLen == _h.sectionLen() - sizeof(JSectFooter) - sizeof(JSectHeader)); _entries = unique_ptr(new BufReader(p, _uncompressed.size())); } // We work with the uncompressed buffer when doing a WRITETODATAFILES (for speed) JournalSectionIterator(const JSectHeader& h, const void* p, unsigned len) : _entries(new BufReader((const char*)p, len)), _h(h), _lastDbName(0), _doDurOps(false) {} bool atEof() const { return _entries->atEof(); } unsigned long long seqNumber() const { return _h.seqNumber; } /** get the next entry from the log. this function parses and combines JDbContext and JEntry's. * throws on premature end of section. */ void next(ParsedJournalEntry& e) { unsigned lenOrOpCode{}; _entries->read(lenOrOpCode); if (lenOrOpCode > JEntry::OpCode_Min) { switch (lenOrOpCode) { case JEntry::OpCode_Footer: { verify(false); } case JEntry::OpCode_FileCreated: case JEntry::OpCode_DropDb: { e.dbName = 0; std::shared_ptr op = DurOp::read(lenOrOpCode, *_entries); if (_doDurOps) { e.op = op; } return; } case JEntry::OpCode_DbContext: { _lastDbName = (const char*)_entries->pos(); const unsigned limit = _entries->remaining(); const unsigned len = strnlen(_lastDbName, limit); if (_lastDbName[len] != '\0') { log() << "problem processing journal file during recovery"; throw JournalSectionCorruptException(); } _entries->skip(len + 1); // skip '\0' too _entries->read(lenOrOpCode); // read this for the fall through } // fall through as a basic operation always follows jdbcontext, and we don't have // anything to return yet default: // fall through ; } } // JEntry - a basic write verify(lenOrOpCode && lenOrOpCode < JEntry::OpCode_Min); _entries->rewind(4); e.e = (JEntry*)_entries->skip(sizeof(JEntry)); e.dbName = e.e->isLocalDbContext() ? "local" : _lastDbName; verify(e.e->len == lenOrOpCode); _entries->skip(e.e->len); } private: unique_ptr _entries; const JSectHeader _h; const char* _lastDbName; // pointer into mmaped journal file const bool _doDurOps; string _uncompressed; }; static string fileName(const char* dbName, int fileNo) { stringstream ss; ss << dbName << '.'; verify(fileNo >= 0); if (fileNo == JEntry::DotNsSuffix) ss << "ns"; else ss << fileNo; // relative name -> full path name boost::filesystem::path full(storageGlobalParams.dbpath); full /= ss.str(); return full.string(); } RecoveryJob::RecoveryJob() : _recovering(false), _lastDataSyncedFromLastRun(0), _lastSeqSkipped(0), _appliedAnySections(false) {} RecoveryJob::~RecoveryJob() { invariant(!"RecoveryJob is intentionally leaked with a bare call to operator new()"); } void RecoveryJob::close(OperationContext* txn) { stdx::lock_guard lk(_mx); _close(txn); } void RecoveryJob::_close(OperationContext* txn) { MongoFile::flushAll(txn, true); LockMongoFilesExclusive lock(txn); for (auto& durFile : _mmfs) { durFile->close(txn); } _mmfs.clear(); } RecoveryJob::Last::Last(OperationContext* txn) : _txn(txn), mmf(NULL), fileNo(-1) { // Make sure the files list does not change from underneath LockMongoFilesShared::assertAtLeastReadLocked(txn); } DurableMappedFile* RecoveryJob::Last::newEntry(const dur::ParsedJournalEntry& entry, RecoveryJob& rj) { int num = entry.e->getFileNo(); if (num == fileNo && entry.dbName == dbName) return mmf; string fn = fileName(entry.dbName, num); MongoFile* file; { MongoFileFinder finder(_txn); // must release lock before creating new DurableMappedFile file = finder.findByPath(fn); } if (file) { verify(file->isDurableMappedFile()); mmf = (DurableMappedFile*)file; } else { if (!rj._recovering) { log() << "journal error applying writes, file " << fn << " is not open" << endl; verify(false); } std::shared_ptr sp(new DurableMappedFile(_txn)); verify(sp->open(_txn, fn)); rj._mmfs.push_back(sp); mmf = sp.get(); } // we do this last so that if an exception were thrown, there isn't any wrong memory dbName = entry.dbName; fileNo = num; return mmf; } void RecoveryJob::write(Last& last, const ParsedJournalEntry& entry) { // TODO(mathias): look into making some of these dasserts verify(entry.e); verify(entry.dbName); DurableMappedFile* mmf = last.newEntry(entry, *this); if ((entry.e->ofs + entry.e->len) <= mmf->length()) { verify(mmf->view_write()); verify(entry.e->srcData()); void* dest = (char*)mmf->view_write() + entry.e->ofs; memcpy(dest, entry.e->srcData(), entry.e->len); stats.curr()->_writeToDataFilesBytes += entry.e->len; } else { massert(13622, "Trying to write past end of file in WRITETODATAFILES", _recovering); } } void RecoveryJob::applyEntry(Last& last, const ParsedJournalEntry& entry, bool apply, bool dump) { if (entry.e) { if (dump) { stringstream ss; ss << " BASICWRITE " << setw(20) << entry.dbName << '.'; if (entry.e->isNsSuffix()) ss << "ns"; else ss << setw(2) << entry.e->getFileNo(); ss << ' ' << setw(6) << entry.e->len << ' ' << /*hex << setw(8) << (size_t) fqe.srcData << dec <<*/ " " << redact(hexdump(entry.e->srcData(), entry.e->len)); log() << ss.str() << endl; } if (apply) { write(last, entry); } } else if (entry.op) { // a DurOp subclass operation if (dump) { log() << " OP " << redact(entry.op->toString()) << endl; } if (apply) { if (entry.op->needFilesClosed()) { _close(last.txn()); // locked in processSection } entry.op->replay(); } } } void RecoveryJob::applyEntries(OperationContext* txn, const vector& entries) { const bool apply = (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalScanOnly) == 0; const bool dump = (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalDumpJournal); if (dump) { log() << "BEGIN section" << endl; } Last last(txn); for (vector::const_iterator i = entries.begin(); i != entries.end(); ++i) { applyEntry(last, *i, apply, dump); } if (dump) { log() << "END section" << endl; } } void RecoveryJob::processSection(OperationContext* txn, const JSectHeader* h, const void* p, unsigned len, const JSectFooter* f) { LockMongoFilesShared lkFiles(txn); // for RecoveryJob::Last stdx::lock_guard lk(_mx); if (_recovering) { // Check the footer checksum before doing anything else. verify(((const char*)h) + sizeof(JSectHeader) == p); if (!f->checkHash(h, len + sizeof(JSectHeader))) { log() << "journal section checksum doesn't match"; throw JournalSectionCorruptException(); } static uint64_t numJournalSegmentsSkipped = 0; static const uint64_t kMaxSkippedSectionsToLog = 10; if (_lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) { if (_appliedAnySections) { severe() << "Journal section sequence number " << h->seqNumber << " is lower than the threshold for applying (" << h->seqNumber + ExtraKeepTimeMs << ") but we have already applied some journal sections. This implies a " << "corrupt journal file."; fassertFailed(34369); } if (++numJournalSegmentsSkipped < kMaxSkippedSectionsToLog) { log() << "recover skipping application of section seq:" << h->seqNumber << " < lsn:" << _lastDataSyncedFromLastRun << endl; } else if (numJournalSegmentsSkipped == kMaxSkippedSectionsToLog) { log() << "recover skipping application of section more..." << endl; } _lastSeqSkipped = h->seqNumber; return; } if (!_appliedAnySections) { _appliedAnySections = true; if (numJournalSegmentsSkipped >= kMaxSkippedSectionsToLog) { // Log the last skipped section's sequence number if it hasn't been logged before. log() << "recover final skipped journal section had sequence number " << _lastSeqSkipped; } log() << "recover applying initial journal section with sequence number " << h->seqNumber; } } unique_ptr i; if (_recovering) { i = unique_ptr(new JournalSectionIterator(*h, p, len, _recovering)); } else { i = unique_ptr( new JournalSectionIterator(*h, /*after header*/ p, /*w/out header*/ len)); } // we use a static so that we don't have to reallocate every time through. occasionally we // go back to a small allocation so that if there were a spiky growth it won't stick forever. static vector entries; entries.clear(); /** TEMP uncomment RARELY OCCASIONALLY { if( entries.capacity() > 2048 ) { entries.shrink_to_fit(); entries.reserve(2048); } } */ // first read all entries to make sure this section is valid ParsedJournalEntry e; while (!i->atEof()) { i->next(e); entries.push_back(e); } // got all the entries for one group commit. apply them: applyEntries(txn, entries); } /** apply a specific journal file, that is already mmap'd @param p start of the memory mapped file @return true if this is detected to be the last file (ends abruptly) */ bool RecoveryJob::processFileBuffer(OperationContext* txn, const void* p, unsigned len) { try { unsigned long long fileId; BufReader br(p, len); { // read file header JHeader h; std::memset(&h, 0, sizeof(h)); br.read(h); if (!h.valid()) { log() << "Journal file header invalid. This could indicate corruption, or " << "an unclean shutdown while writing the first section in a journal " << "file."; throw JournalSectionCorruptException(); } if (!h.versionOk()) { log() << "journal file version number mismatch got:" << hex << h._version << " expected:" << hex << (unsigned)JHeader::CurrentVersion << ". if you have just upgraded, recover with old version of mongod, " "terminate cleanly, then upgrade." << endl; // Not using JournalSectionCurruptException as we don't want to ignore // journal files on upgrade. uasserted(13536, str::stream() << "journal version number mismatch " << h._version); } fileId = h.fileId; if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalDumpJournal) { log() << "JHeader::fileId=" << fileId << endl; } } // read sections while (!br.atEof()) { JSectHeader h; std::memset(&h, 0, sizeof(h)); br.peek(h); if (h.fileId != fileId) { if (kDebugBuild || (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalDumpJournal)) { log() << "Ending processFileBuffer at differing fileId want:" << fileId << " got:" << h.fileId << endl; log() << " sect len:" << h.sectionLen() << " seqnum:" << h.seqNumber << endl; } return true; } unsigned slen = h.sectionLen(); unsigned dataLen = slen - sizeof(JSectHeader) - sizeof(JSectFooter); const char* hdr = (const char*)br.skip(h.sectionLenWithPadding()); const char* data = hdr + sizeof(JSectHeader); const char* footer = data + dataLen; processSection(txn, (const JSectHeader*)hdr, data, dataLen, (const JSectFooter*)footer); // ctrl c check uassert(ErrorCodes::Interrupted, "interrupted during journal recovery", !globalInShutdownDeprecated()); } } catch (const BufReader::eof&) { if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalDumpJournal) log() << "ABRUPT END" << endl; return true; // abrupt end } catch (const JournalSectionCorruptException&) { if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalDumpJournal) log() << "ABRUPT END" << endl; return true; // abrupt end } return false; // non-abrupt end } /** apply a specific journal file */ bool RecoveryJob::processFile(OperationContext* txn, boost::filesystem::path journalfile) { log() << "recover " << journalfile.string() << endl; try { if (boost::filesystem::file_size(journalfile.string()) == 0) { log() << "recover info " << journalfile.string() << " has zero length" << endl; return true; } } catch (...) { // if something weird like a permissions problem keep going so the massert down below can // happen (presumably) log() << "recover exception checking filesize" << endl; } MemoryMappedFile f{txn, MongoFile::Options::READONLY | MongoFile::Options::SEQUENTIAL}; ON_BLOCK_EXIT([&f, &txn] { LockMongoFilesExclusive lock(txn); f.close(txn); }); void* p = f.map(txn, journalfile.string().c_str()); massert(13544, str::stream() << "recover error couldn't open " << journalfile.string(), p); return processFileBuffer(txn, p, (unsigned)f.length()); } /** @param files all the j._0 style files we need to apply for recovery */ void RecoveryJob::go(OperationContext* txn, vector& files) { log() << "recover begin" << endl; LockMongoFilesExclusive lkFiles(txn); // for RecoveryJob::Last _recovering = true; // load the last sequence number synced to the datafiles on disk before the last crash _lastDataSyncedFromLastRun = journalReadLSN(); log() << "recover lsn: " << _lastDataSyncedFromLastRun << endl; for (unsigned i = 0; i != files.size(); ++i) { bool abruptEnd = processFile(txn, files[i]); if (abruptEnd && i + 1 < files.size()) { log() << "recover error: abrupt end to file " << files[i].string() << ", yet it isn't the last journal file" << endl; close(txn); uasserted(13535, "recover abrupt journal file end"); } } if (_lastSeqSkipped && !_appliedAnySections) { log() << "recover journal replay completed without applying any sections. " << "This can happen if there were no writes after the last fsync of the data files. " << "Last skipped sections had sequence number " << _lastSeqSkipped; } close(txn); if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalScanOnly) { uasserted(13545, str::stream() << "--durOptions " << (int)MMAPV1Options::JournalScanOnly << " (scan only) specified"); } log() << "recover cleaning up" << endl; removeJournalFiles(); log() << "recover done" << endl; okToCleanUp = true; _recovering = false; } void _recover(OperationContext* txn) { verify(storageGlobalParams.dur); boost::filesystem::path p = getJournalDir(); if (!exists(p)) { log() << "directory " << p.string() << " does not exist, there will be no recovery startup step" << endl; okToCleanUp = true; return; } vector journalFiles; getFiles(p, journalFiles); if (journalFiles.empty()) { log() << "recover : no journal files present, no recovery needed" << endl; okToCleanUp = true; return; } RecoveryJob::get().go(txn, journalFiles); } /** recover from a crash called during startup throws on error */ void replayJournalFilesAtStartup() { // we use a lock so that exitCleanly will wait for us // to finish (or at least to notice what is up and stop) auto txn = cc().makeOperationContext(); ScopedTransaction transaction(txn.get(), MODE_X); Lock::GlobalWrite lk(txn->lockState()); _recover(txn.get()); // throws on interruption } struct BufReaderY { int a, b; }; class BufReaderUnitTest : public StartupTest { public: void run() { BufReader r((void*)"abcdabcdabcd", 12); char x; BufReaderY y; r.read(x); // cout << x; // a verify(x == 'a'); r.read(y); r.read(x); verify(x == 'b'); } } brunittest; } // namespace dur } // namespace mongo