// @file dur_recover.cpp crash recovery via the journal
/**
* Copyright (C) 2009 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
#include "pch.h"
#include "dur.h"
#include "dur_stats.h"
#include "dur_recover.h"
#include "dur_journal.h"
#include "dur_journalformat.h"
#include "durop.h"
#include "namespace.h"
#include "../util/mongoutils/str.h"
#include "../util/bufreader.h"
#include "../util/concurrency/race.h"
#include "pdfile.h"
#include "database.h"
#include "db.h"
#include "../util/unittest.h"
#include "../util/checksum.h"
#include "cmdline.h"
#include "curop.h"
#include "mongommf.h"
#include "../util/compress.h"
#include
#include
using namespace mongoutils;
#include
namespace mongo {
namespace dur {
struct ParsedJournalEntry { /*copyable*/
ParsedJournalEntry() : e(0) { }
// relative path of database for the operation.
// might be a pointer into mmaped Journal file
const char *dbName;
// thse are pointers into the memory mapped journal file
const JEntry *e; // local db sentinel is already parsed out here into dbName
// if not one of the two simple JEntry's above, this is the operation:
shared_ptr op;
};
void removeJournalFiles();
boost::filesystem::path getJournalDir();
/** get journal filenames, in order. throws if unexpected content found */
static void getFiles(boost::filesystem::path dir, vector& files) {
map m;
for ( boost::filesystem::directory_iterator i( dir );
i != boost::filesystem::directory_iterator();
++i ) {
boost::filesystem::path filepath = *i;
string fileName = boost::filesystem::path(*i).leaf();
if( str::startsWith(fileName, "j._") ) {
unsigned u = str::toUnsigned( str::after(fileName, '_') );
if( m.count(u) ) {
uasserted(13531, str::stream() << "unexpected files in journal directory " << dir.string() << " : " << fileName);
}
m.insert( pair(u,filepath) );
}
}
for( map::iterator i = m.begin(); i != m.end(); ++i ) {
if( i != m.begin() && m.count(i->first - 1) == 0 ) {
uasserted(13532,
str::stream() << "unexpected file in journal directory " << dir.string()
<< " : " << boost::filesystem::path(i->second).leaf() << " : can't find its preceeding file");
}
files.push_back(i->second);
}
}
/** read through the memory mapped data of a journal file (journal/j._ file)
throws
*/
class JournalSectionIterator : boost::noncopyable {
auto_ptr _entries;
const JSectHeader _h;
const char *_lastDbName; // pointer into mmaped journal file
const bool _doDurOps;
string _uncompressed;
public:
JournalSectionIterator(const JSectHeader& h, const void *compressed, unsigned compressedLen, bool doDurOpsRecovering) :
_h(h),
_lastDbName(0)
, _doDurOps(doDurOpsRecovering)
{
assert( doDurOpsRecovering );
bool ok = uncompress((const char *)compressed, compressedLen, &_uncompressed);
if( !ok ) {
// it should always be ok (i think?) as there is a previous check to see that the JSectFooter is ok
log() << "couldn't uncompress journal section" << endl;
msgasserted(15874, "couldn't uncompress journal section");
}
const char *p = _uncompressed.c_str();
assert( compressedLen == _h.sectionLen() - sizeof(JSectFooter) - sizeof(JSectHeader) );
_entries = auto_ptr( new BufReader(p, _uncompressed.size()) );
}
// we work with the uncompressed buffer when doing a WRITETODATAFILES (for speed)
JournalSectionIterator(const JSectHeader &h, const void *p, unsigned len) :
_entries( new BufReader((const char *) p, len) ),
_h(h),
_lastDbName(0)
, _doDurOps(false)
{ }
bool atEof() const { return _entries->atEof(); }
unsigned long long seqNumber() const { return _h.seqNumber; }
/** get the next entry from the log. this function parses and combines JDbContext and JEntry's.
* throws on premature end of section.
*/
void next(ParsedJournalEntry& e) {
unsigned lenOrOpCode;
_entries->read(lenOrOpCode);
if (lenOrOpCode > JEntry::OpCode_Min) {
switch( lenOrOpCode ) {
case JEntry::OpCode_Footer: {
assert( false );
}
case JEntry::OpCode_FileCreated:
case JEntry::OpCode_DropDb: {
e.dbName = 0;
boost::shared_ptr op = DurOp::read(lenOrOpCode, *_entries);
if (_doDurOps) {
e.op = op;
}
return;
}
case JEntry::OpCode_DbContext: {
_lastDbName = (const char*) _entries->pos();
const unsigned limit = std::min((unsigned)Namespace::MaxNsLen, _entries->remaining());
const unsigned len = strnlen(_lastDbName, limit);
massert(13533, "problem processing journal file during recovery", _lastDbName[len] == '\0');
_entries->skip(len+1); // skip '\0' too
_entries->read(lenOrOpCode); // read this for the fall through
}
// fall through as a basic operation always follows jdbcontext, and we don't have anything to return yet
default:
// fall through
;
}
}
// JEntry - a basic write
assert( lenOrOpCode && lenOrOpCode < JEntry::OpCode_Min );
_entries->rewind(4);
e.e = (JEntry *) _entries->skip(sizeof(JEntry));
e.dbName = e.e->isLocalDbContext() ? "local" : _lastDbName;
assert( e.e->len == lenOrOpCode );
_entries->skip(e.e->len);
}
};
static string fileName(const char* dbName, int fileNo) {
stringstream ss;
ss << dbName << '.';
assert( fileNo >= 0 );
if( fileNo == JEntry::DotNsSuffix )
ss << "ns";
else
ss << fileNo;
// relative name -> full path name
boost::filesystem::path full(dbpath);
full /= ss.str();
return full.string();
}
RecoveryJob::~RecoveryJob() {
DESTRUCTOR_GUARD(
if( !_mmfs.empty() )
close();
)
}
void RecoveryJob::close() {
scoped_lock lk(_mx);
_close();
}
void RecoveryJob::_close() {
MongoFile::flushAll(true);
_mmfs.clear();
}
void RecoveryJob::write(const ParsedJournalEntry& entry) {
//TODO(mathias): look into making some of these dasserts
assert(entry.e);
assert(entry.dbName);
assert(strnlen(entry.dbName, MaxDatabaseNameLen) < MaxDatabaseNameLen);
const string fn = fileName(entry.dbName, entry.e->getFileNo());
MongoFile* file;
{
MongoFileFinder finder; // must release lock before creating new MongoMMF
file = finder.findByPath(fn);
}
MongoMMF* mmf;
if (file) {
assert(file->isMongoMMF());
mmf = (MongoMMF*)file;
}
else {
if( !_recovering ) {
log() << "journal error applying writes, file " << fn << " is not open" << endl;
assert(false);
}
boost::shared_ptr sp (new MongoMMF);
assert(sp->open(fn, false));
_mmfs.push_back(sp);
mmf = sp.get();
}
if ((entry.e->ofs + entry.e->len) <= mmf->length()) {
assert(mmf->view_write());
assert(entry.e->srcData());
void* dest = (char*)mmf->view_write() + entry.e->ofs;
memcpy(dest, entry.e->srcData(), entry.e->len);
stats.curr->_writeToDataFilesBytes += entry.e->len;
}
else {
massert(13622, "Trying to write past end of file in WRITETODATAFILES", _recovering);
}
}
void RecoveryJob::applyEntry(const ParsedJournalEntry& entry, bool apply, bool dump) {
if( entry.e ) {
if( dump ) {
stringstream ss;
ss << " BASICWRITE " << setw(20) << entry.dbName << '.';
if( entry.e->isNsSuffix() )
ss << "ns";
else
ss << setw(2) << entry.e->getFileNo();
ss << ' ' << setw(6) << entry.e->len << ' ' << /*hex << setw(8) << (size_t) fqe.srcData << dec <<*/
" " << hexdump(entry.e->srcData(), entry.e->len);
log() << ss.str() << endl;
}
if( apply ) {
write(entry);
}
}
else if(entry.op) {
// a DurOp subclass operation
if( dump ) {
log() << " OP " << entry.op->toString() << endl;
}
if( apply ) {
if( entry.op->needFilesClosed() ) {
_close(); // locked in processSection
}
entry.op->replay();
}
}
}
void RecoveryJob::applyEntries(const vector &entries) {
bool apply = (cmdLine.durOptions & CmdLine::DurScanOnly) == 0;
bool dump = cmdLine.durOptions & CmdLine::DurDumpJournal;
if( dump )
log() << "BEGIN section" << endl;
for( vector::const_iterator i = entries.begin(); i != entries.end(); ++i ) {
applyEntry(*i, apply, dump);
}
if( dump )
log() << "END section" << endl;
}
void RecoveryJob::processSection(const JSectHeader *h, const void *p, unsigned len, const JSectFooter *f) {
scoped_lock lk(_mx);
RACECHECK
/** todo: we should really verify the checksum to see that seqNumber is ok?
that is expensive maybe there is some sort of checksum of just the header
within the header itself
*/
if( _recovering && _lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs ) {
if( h->seqNumber != _lastSeqMentionedInConsoleLog ) {
static int n;
if( ++n < 10 ) {
log() << "recover skipping application of section seq:" << h->seqNumber << " < lsn:" << _lastDataSyncedFromLastRun << endl;
}
else if( n == 10 ) {
log() << "recover skipping application of section more..." << endl;
}
_lastSeqMentionedInConsoleLog = h->seqNumber;
}
return;
}
auto_ptr i;
if( _recovering ) {
i = auto_ptr(new JournalSectionIterator(*h, p, len, _recovering));
}
else {
i = auto_ptr(new JournalSectionIterator(*h, /*after header*/p, /*w/out header*/len));
}
// we use a static so that we don't have to reallocate every time through. occasionally we
// go back to a small allocation so that if there were a spiky growth it won't stick forever.
static vector entries;
entries.clear();
/** TEMP uncomment
RARELY OCCASIONALLY {
if( entries.capacity() > 2048 ) {
entries.shrink_to_fit();
entries.reserve(2048);
}
}
*/
// first read all entries to make sure this section is valid
ParsedJournalEntry e;
while( !i->atEof() ) {
i->next(e);
entries.push_back(e);
}
// after the entries check the footer checksum
if( _recovering ) {
assert( ((const char *)h) + sizeof(JSectHeader) == p );
if( !f->checkHash(h, len + sizeof(JSectHeader)) ) {
msgasserted(13594, "journal checksum doesn't match");
}
}
// got all the entries for one group commit. apply them:
applyEntries(entries);
}
/** apply a specific journal file, that is already mmap'd
@param p start of the memory mapped file
@return true if this is detected to be the last file (ends abruptly)
*/
bool RecoveryJob::processFileBuffer(const void *p, unsigned len) {
try {
unsigned long long fileId;
BufReader br(p,len);
{
// read file header
JHeader h;
br.read(h);
/* [dm] not automatically handled. we should eventually handle this automatically. i think:
(1) if this is the final journal file
(2) and the file size is just the file header in length (or less) -- this is a bit tricky to determine if prealloced
then can just assume recovery ended cleanly and not error out (still should log).
*/
uassert(13537,
"journal file header invalid. This could indicate corruption in a journal file, or perhaps a crash where sectors in file header were in flight written out of order at time of crash (unlikely but possible).",
h.valid());
if( !h.versionOk() ) {
log() << "journal file version number mismatch got:" << hex << h._version
<< " expected:" << hex << (unsigned) JHeader::CurrentVersion
<< ". if you have just upgraded, recover with old version of mongod, terminate cleanly, then upgrade."
<< endl;
uasserted(13536, str::stream() << "journal version number mismatch " << h._version);
}
fileId = h.fileId;
if(cmdLine.durOptions & CmdLine::DurDumpJournal) {
log() << "JHeader::fileId=" << fileId << endl;
}
}
// read sections
while ( !br.atEof() ) {
JSectHeader h;
br.peek(h);
if( h.fileId != fileId ) {
if( debug || (cmdLine.durOptions & CmdLine::DurDumpJournal) ) {
log() << "Ending processFileBuffer at differing fileId want:" << fileId << " got:" << h.fileId << endl;
log() << " sect len:" << h.sectionLen() << " seqnum:" << h.seqNumber << endl;
}
return true;
}
unsigned slen = h.sectionLen();
unsigned dataLen = slen - sizeof(JSectHeader) - sizeof(JSectFooter);
const char *hdr = (const char *) br.skip(h.sectionLenWithPadding());
const char *data = hdr + sizeof(JSectHeader);
const char *footer = data + dataLen;
processSection((const JSectHeader*) hdr, data, dataLen, (const JSectFooter*) footer);
// ctrl c check
killCurrentOp.checkForInterrupt(false);
}
}
catch( BufReader::eof& ) {
if( cmdLine.durOptions & CmdLine::DurDumpJournal )
log() << "ABRUPT END" << endl;
return true; // abrupt end
}
return false; // non-abrupt end
}
/** apply a specific journal file */
bool RecoveryJob::processFile(boost::filesystem::path journalfile) {
log() << "recover " << journalfile.string() << endl;
try {
if( boost::filesystem::file_size( journalfile.string() ) == 0 ) {
log() << "recover info " << journalfile.string() << " has zero length" << endl;
return true;
}
} catch(...) {
// if something weird like a permissions problem keep going so the massert down below can happen (presumably)
log() << "recover exception checking filesize" << endl;
}
MemoryMappedFile f;
void *p = f.mapWithOptions(journalfile.string().c_str(), MongoFile::READONLY | MongoFile::SEQUENTIAL);
massert(13544, str::stream() << "recover error couldn't open " << journalfile.string(), p);
return processFileBuffer(p, (unsigned) f.length());
}
/** @param files all the j._0 style files we need to apply for recovery */
void RecoveryJob::go(vector& files) {
log() << "recover begin" << endl;
_recovering = true;
// load the last sequence number synced to the datafiles on disk before the last crash
_lastDataSyncedFromLastRun = journalReadLSN();
log() << "recover lsn: " << _lastDataSyncedFromLastRun << endl;
for( unsigned i = 0; i != files.size(); ++i ) {
bool abruptEnd = processFile(files[i]);
if( abruptEnd && i+1 < files.size() ) {
log() << "recover error: abrupt end to file " << files[i].string() << ", yet it isn't the last journal file" << endl;
close();
uasserted(13535, "recover abrupt journal file end");
}
}
close();
if( cmdLine.durOptions & CmdLine::DurScanOnly ) {
uasserted(13545, str::stream() << "--durOptions " << (int) CmdLine::DurScanOnly << " (scan only) specified");
}
log() << "recover cleaning up" << endl;
removeJournalFiles();
log() << "recover done" << endl;
okToCleanUp = true;
_recovering = false;
}
void _recover() {
assert( cmdLine.dur );
boost::filesystem::path p = getJournalDir();
if( !exists(p) ) {
log() << "directory " << p.string() << " does not exist, there will be no recovery startup step" << endl;
okToCleanUp = true;
return;
}
vector journalFiles;
getFiles(p, journalFiles);
if( journalFiles.empty() ) {
log() << "recover : no journal files present, no recovery needed" << endl;
okToCleanUp = true;
return;
}
RecoveryJob::get().go(journalFiles);
}
extern mutex groupCommitMutex;
/** recover from a crash
called during startup
throws on error
*/
void recover() {
// we use a lock so that exitCleanly will wait for us
// to finish (or at least to notice what is up and stop)
writelock lk;
// this is so the mutexdebugger doesn't get confused. we are actually single threaded
// at this point in the program so it wouldn't have been a true problem (I think)
scoped_lock lk2(groupCommitMutex);
_recover(); // throws on interruption
}
struct BufReaderY { int a,b; };
class BufReaderUnitTest : public UnitTest {
public:
void run() {
BufReader r((void*) "abcdabcdabcd", 12);
char x;
BufReaderY y;
r.read(x); //cout << x; // a
assert( x == 'a' );
r.read(y);
r.read(x);
assert( x == 'b' );
}
} brunittest;
// can't free at termination because order of destruction of global vars is arbitrary
RecoveryJob &RecoveryJob::_instance = *(new RecoveryJob());
} // namespace dur
} // namespace mongo