// @file dur_recover.cpp crash recovery via the journal /** * Copyright (C) 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "pch.h" #include "dur.h" #include "dur_stats.h" #include "dur_recover.h" #include "dur_journal.h" #include "dur_journalformat.h" #include "durop.h" #include "namespace.h" #include "../util/mongoutils/str.h" #include "../util/bufreader.h" #include "pdfile.h" #include "database.h" #include "db.h" #include "../util/unittest.h" #include "../util/checksum.h" #include "cmdline.h" #include "curop.h" #include "mongommf.h" #include #include using namespace mongoutils; namespace mongo { namespace dur { struct ParsedJournalEntry { /*copyable*/ ParsedJournalEntry() : e(0) { } // relative path of database for the operation. // might be a pointer into mmaped Journal file const char *dbName; // thse are pointers into the memory mapped journal file const JEntry *e; // local db sentinel is already parsed out here into dbName // if not one of the two simple JEntry's above, this is the operation: shared_ptr op; }; void removeJournalFiles(); path getJournalDir(); /** get journal filenames, in order. throws if unexpected content found */ static void getFiles(path dir, vector& files) { map m; for ( filesystem::directory_iterator i( dir ); i != filesystem::directory_iterator(); ++i ) { filesystem::path filepath = *i; string fileName = filesystem::path(*i).leaf(); if( str::startsWith(fileName, "j._") ) { unsigned u = str::toUnsigned( str::after(fileName, '_') ); if( m.count(u) ) { uasserted(13531, str::stream() << "unexpected files in journal directory " << dir.string() << " : " << fileName); } m.insert( pair(u,filepath) ); } } for( map::iterator i = m.begin(); i != m.end(); ++i ) { if( i != m.begin() && m.count(i->first - 1) == 0 ) { uasserted(13532, str::stream() << "unexpected file in journal directory " << dir.string() << " : " << filesystem::path(i->second).leaf() << " : can't find its preceeding file"); } files.push_back(i->second); } } /** read through the memory mapped data of a journal file (journal/j._ file) throws */ class JournalSectionIterator : boost::noncopyable { public: JournalSectionIterator(const void *p, unsigned len, bool doDurOps) : _br(p, len) , _sectHead(static_cast(_br.skip(sizeof(JSectHeader)))) , _lastDbName(NULL) , _doDurOps(doDurOps) {} bool atEof() const { return _br.atEof(); } unsigned long long seqNumber() const { return _sectHead->seqNumber; } /** get the next entry from the log. this function parses and combines JDbContext and JEntry's. * @return true if got an entry. false at successful end of section (and no entry returned). * throws on premature end of section. */ bool next(ParsedJournalEntry& e) { unsigned lenOrOpCode; _br.read(lenOrOpCode); if (lenOrOpCode > JEntry::OpCode_Min) { switch( lenOrOpCode ) { case JEntry::OpCode_Footer: { if (_doDurOps) { const char* pos = (const char*) _br.pos(); pos -= sizeof(lenOrOpCode); // rewind to include OpCode const JSectFooter& footer = *(const JSectFooter*)pos; int len = pos - (char*)_sectHead; if (!footer.checkHash(_sectHead, len)) { massert(13594, "journal checksum doesn't match", false); } } return false; // false return value denotes end of section } case JEntry::OpCode_FileCreated: case JEntry::OpCode_DropDb: { e.dbName = 0; boost::shared_ptr op = DurOp::read(lenOrOpCode, _br); if (_doDurOps) { e.op = op; } return true; } case JEntry::OpCode_DbContext: { _lastDbName = (const char*) _br.pos(); const unsigned limit = std::min((unsigned)Namespace::MaxNsLen, _br.remaining()); const unsigned len = strnlen(_lastDbName, limit); massert(13533, "problem processing journal file during recovery", _lastDbName[len] == '\0'); _br.skip(len+1); // skip '\0' too _br.read(lenOrOpCode); } // fall through as a basic operation always follows jdbcontext, and we don't have anything to return yet default: // fall through ; } } // JEntry - a basic write assert( lenOrOpCode && lenOrOpCode < JEntry::OpCode_Min ); _br.rewind(4); e.e = (JEntry *) _br.skip(sizeof(JEntry)); e.dbName = e.e->isLocalDbContext() ? "local" : _lastDbName; assert( e.e->len == lenOrOpCode ); _br.skip(e.e->len); return true; } private: BufReader _br; const JSectHeader* _sectHead; const char *_lastDbName; // pointer into mmaped journal file const bool _doDurOps; }; static string fileName(const char* dbName, int fileNo) { stringstream ss; ss << dbName << '.'; assert( fileNo >= 0 ); if( fileNo == JEntry::DotNsSuffix ) ss << "ns"; else ss << fileNo; // relative name -> full path name path full(dbpath); full /= ss.str(); return full.string(); } RecoveryJob::~RecoveryJob() { DESTRUCTOR_GUARD( if( !_mmfs.empty() ) close(); ) } void RecoveryJob::close() { scoped_lock lk(_mx); _close(); } void RecoveryJob::_close() { MongoFile::flushAll(true); _mmfs.clear(); } void RecoveryJob::write(const ParsedJournalEntry& entry) { //TODO(mathias): look into making some of these dasserts assert(entry.e); assert(entry.dbName); assert(strnlen(entry.dbName, MaxDatabaseNameLen) < MaxDatabaseNameLen); const string fn = fileName(entry.dbName, entry.e->getFileNo()); MongoFile* file; { MongoFileFinder finder; // must release lock before creating new MongoMMF file = finder.findByPath(fn); } MongoMMF* mmf; if (file) { assert(file->isMongoMMF()); mmf = (MongoMMF*)file; } else { assert(_recovering); boost::shared_ptr sp (new MongoMMF); assert(sp->open(fn, false)); _mmfs.push_back(sp); mmf = sp.get(); } if ((entry.e->ofs + entry.e->len) <= mmf->length()) { assert(mmf->view_write()); assert(entry.e->srcData()); void* dest = (char*)mmf->view_write() + entry.e->ofs; memcpy(dest, entry.e->srcData(), entry.e->len); stats.curr->_writeToDataFilesBytes += entry.e->len; } else { massert(13622, "Trying to write past end of file in WRITETODATAFILES", _recovering); } } void RecoveryJob::applyEntry(const ParsedJournalEntry& entry, bool apply, bool dump) { if( entry.e ) { if( dump ) { stringstream ss; ss << " BASICWRITE " << setw(20) << entry.dbName << '.'; if( entry.e->isNsSuffix() ) ss << "ns"; else ss << setw(2) << entry.e->getFileNo(); ss << ' ' << setw(6) << entry.e->len << ' ' << /*hex << setw(8) << (size_t) fqe.srcData << dec <<*/ " " << hexdump(entry.e->srcData(), entry.e->len); log() << ss.str() << endl; } if( apply ) { write(entry); } } else if(entry.op) { // a DurOp subclass operation if( dump ) { log() << " OP " << entry.op->toString() << endl; } if( apply ) { if( entry.op->needFilesClosed() ) { _close(); // locked in processSection } entry.op->replay(); } } } void RecoveryJob::applyEntries(const vector &entries) { bool apply = (cmdLine.durOptions & CmdLine::DurScanOnly) == 0; bool dump = cmdLine.durOptions & CmdLine::DurDumpJournal; if( dump ) log() << "BEGIN section" << endl; for( vector::const_iterator i = entries.begin(); i != entries.end(); ++i ) { applyEntry(*i, apply, dump); } if( dump ) log() << "END section" << endl; } void RecoveryJob::processSection(const void *p, unsigned len) { scoped_lock lk(_mx); vector entries; JournalSectionIterator i(p, len, _recovering); //DEV log() << "recovery processSection seq:" << i.seqNumber() << endl; if( _recovering && _lastDataSyncedFromLastRun > i.seqNumber() + ExtraKeepTimeMs ) { if( i.seqNumber() != _lastSeqMentionedInConsoleLog ) { log() << "recover skipping application of section seq:" << i.seqNumber() << " < lsn:" << _lastDataSyncedFromLastRun << endl; _lastSeqMentionedInConsoleLog = i.seqNumber(); } return; } // first read all entries to make sure this section is valid ParsedJournalEntry e; while( i.next(e) ) { entries.push_back(e); } // got all the entries for one group commit. apply them: applyEntries(entries); } /** apply a specific journal file, that is already mmap'd @param p start of the memory mapped file @return true if this is detected to be the last file (ends abruptly) */ bool RecoveryJob::processFileBuffer(const void *p, unsigned len) { try { unsigned long long fileId; BufReader br(p,len); { // read file header JHeader h; br.read(h); if( !h.versionOk() ) { log() << "journal file version number mismatch. recover with old version of mongod, terminate cleanly, then upgrade." << endl; uasserted(13536, str::stream() << "journal version number mismatch " << h._version); } uassert(13537, "journal header invalid", h.valid()); fileId = h.fileId; if(cmdLine.durOptions & CmdLine::DurDumpJournal) { log() << "JHeader::fileId=" << fileId << endl; } } // read sections while ( !br.atEof() ) { JSectHeader h; br.peek(h); if( h.fileId != fileId ) { if( debug || (cmdLine.durOptions & CmdLine::DurDumpJournal) ) { log() << "Ending processFileBuffer at differing fileId want:" << fileId << " got:" << h.fileId << endl; log() << " sect len:" << h.len << " seqnum:" << h.seqNumber << endl; } return true; } processSection(br.skip(h.len), h.len); // ctrl c check killCurrentOp.checkForInterrupt(false); } } catch( BufReader::eof& ) { if( cmdLine.durOptions & CmdLine::DurDumpJournal ) log() << "ABRUPT END" << endl; return true; // abrupt end } return false; // non-abrupt end } /** apply a specific journal file */ bool RecoveryJob::processFile(path journalfile) { log() << "recover " << journalfile.string() << endl; try { if( boost::filesystem::file_size( journalfile.string() ) == 0 ) { log() << "recover info " << journalfile.string() << " has zero length" << endl; return true; } } catch(...) { // if something weird like a permissions problem keep going so the massert down below can happen (presumably) log() << "recover exception checking filesize" << endl; } MemoryMappedFile f; void *p = f.mapWithOptions(journalfile.string().c_str(), MongoFile::READONLY | MongoFile::SEQUENTIAL); massert(13544, str::stream() << "recover error couldn't open " << journalfile.string(), p); return processFileBuffer(p, (unsigned) f.length()); } /** @param files all the j._0 style files we need to apply for recovery */ void RecoveryJob::go(vector& files) { log() << "recover begin" << endl; _recovering = true; // load the last sequence number synced to the datafiles on disk before the last crash _lastDataSyncedFromLastRun = journalReadLSN(); log() << "recover lsn: " << _lastDataSyncedFromLastRun << endl; for( unsigned i = 0; i != files.size(); ++i ) { /*bool abruptEnd = */processFile(files[i]); /*if( abruptEnd && i+1 < files.size() ) { log() << "recover error: abrupt end to file " << files[i].string() << ", yet it isn't the last journal file" << endl; close(); uasserted(13535, "recover abrupt journal file end"); }*/ } close(); if( cmdLine.durOptions & CmdLine::DurScanOnly ) { uasserted(13545, str::stream() << "--durOptions " << (int) CmdLine::DurScanOnly << " (scan only) specified"); } log() << "recover cleaning up" << endl; removeJournalFiles(); log() << "recover done" << endl; okToCleanUp = true; _recovering = false; } void _recover() { assert( cmdLine.dur ); filesystem::path p = getJournalDir(); if( !exists(p) ) { log() << "directory " << p.string() << " does not exist, there will be no recovery startup step" << endl; okToCleanUp = true; return; } vector journalFiles; getFiles(p, journalFiles); if( journalFiles.empty() ) { log() << "recover : no journal files present, no recovery needed" << endl; okToCleanUp = true; return; } RecoveryJob::get().go(journalFiles); } extern mutex groupCommitMutex; /** recover from a crash called during startup throws on error */ void recover() { // we use a lock so that exitCleanly will wait for us // to finish (or at least to notice what is up and stop) writelock lk; // this is so the mutexdebugger doesn't get confused. we are actually single threaded // at this point in the program so it wouldn't have been a true problem (I think) scoped_lock lk2(groupCommitMutex); _recover(); // throws on interruption } struct BufReaderY { int a,b; }; class BufReaderUnitTest : public UnitTest { public: void run() { BufReader r((void*) "abcdabcdabcd", 12); char x; BufReaderY y; r.read(x); //cout << x; // a assert( x == 'a' ); r.read(y); r.read(x); assert( x == 'b' ); } } brunittest; // can't free at termination because order of destruction of global vars is arbitrary RecoveryJob &RecoveryJob::_instance = *(new RecoveryJob()); } // namespace dur } // namespace mongo