diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/db.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/dur_journal.cpp | 92 | ||||
-rw-r--r-- | src/mongo/db/dur_journal.h | 25 | ||||
-rw-r--r-- | src/mongo/db/dur_journalimpl.h | 25 | ||||
-rw-r--r-- | src/mongo/db/dur_preplogbuffer.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/dur_recover.cpp | 45 | ||||
-rw-r--r-- | src/mongo/db/dur_recover.h | 9 | ||||
-rw-r--r-- | src/mongo/db/dur_writetodatafiles.cpp | 2 | ||||
-rw-r--r-- | src/mongo/util/mmap.cpp | 11 | ||||
-rw-r--r-- | src/mongo/util/mmap.h | 4 |
10 files changed, 144 insertions, 74 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 90a8013dba2..05347255f52 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -53,6 +53,7 @@ #include "mongo/db/dbmessage.h" #include "mongo/db/dbwebserver.h" #include "mongo/db/dur.h" +#include "mongo/db/dur_journal.h" #include "mongo/db/index_names.h" #include "mongo/db/index_rebuilder.h" #include "mongo/db/initialize_server_global_state.h" @@ -549,7 +550,9 @@ namespace mongo { } Date_t start = jsTime(); + dur::notifyPreDataFileFlush(); int numFiles = MemoryMappedFile::flushAll( true ); + dur::notifyPostDataFileFlush(); time_flushing = (int) (jsTime() - start); _flushed(time_flushing); diff --git a/src/mongo/db/dur_journal.cpp b/src/mongo/db/dur_journal.cpp index a19032bc4c0..0f72a3901c1 100644 --- a/src/mongo/db/dur_journal.cpp +++ b/src/mongo/db/dur_journal.cpp @@ -187,9 +187,10 @@ namespace mongo { _nextFileNumber = 0; _curLogFile = 0; _curFileId = 0; - _preFlushTime = 0; - _lastFlushTime = 0; - _writeToLSNNeeded = false; + _lastSeqNumberWrittenToSharedView.store(0); + _preFlushTime.store(0); + _lastFlushTime.store(0); + _writeToLSNNeeded.store(false); } boost::filesystem::path Journal::getFilePathFor(int filenumber) const { @@ -545,12 +546,9 @@ namespace mongo { void Journal::init() { verify( _curLogFile == 0 ); - MongoFile::notifyPreFlush = preFlush; - MongoFile::notifyPostFlush = postFlush; } void Journal::open() { - verify( MongoFile::notifyPreFlush == preFlush ); SimpleMutex::scoped_lock lk(_curLogFileMutex); _open(); } @@ -603,48 +601,78 @@ namespace mongo { return 0; } - unsigned long long getLastDataFileFlushTime() { - return j.lastFlushTime(); - } - /** remember "last sequence number" to speed recoveries concurrency: called by durThread only. */ - void Journal::updateLSNFile() { + void Journal::updateLSNFile(unsigned long long lsnOfCurrentJournalEntry) { RACECHECK - if( !_writeToLSNNeeded ) + if (!_writeToLSNNeeded.load()) return; - _writeToLSNNeeded = false; + _writeToLSNNeeded.store(false); try { + // Don't read from _lastFlushTime again in this function since it may change. + const uint64_t copyOfLastFlushTime = _lastFlushTime.load(); + + // Only write an LSN that is older than the journal entry we are in the middle of writing. + // If this trips, it means that _lastFlushTime got ahead of what is actually in the data + // files because lsnOfCurrentJournalEntry includes data that hasn't yet been written to the + // data files. + if (copyOfLastFlushTime >= lsnOfCurrentJournalEntry) { + severe() << "Attempting to update LSNFile to " << copyOfLastFlushTime + << " which is not older than the current journal sequence number " + << lsnOfCurrentJournalEntry; + fassertFailed(34370); + } + // os can flush as it likes. if it flushes slowly, we will just do extra work on recovery. // however, given we actually close the file, that seems unlikely. File f; f.open(lsnPath().string().c_str()); - if( !f.is_open() ) { + if (!f.is_open()) { // can get 0 if an i/o error log() << "warning: open of lsn file failed" << endl; return; } - LOG(1) << "lsn set " << _lastFlushTime << endl; + LOG(1) << "lsn set " << copyOfLastFlushTime << endl; LSNFile lsnf; - lsnf.set(_lastFlushTime); + lsnf.set(copyOfLastFlushTime); f.write(0, (char*)&lsnf, sizeof(lsnf)); // do we want to fsync here? if we do it probably needs to be async so the durthread // is not delayed. - } - catch(std::exception& e) { + } catch (std::exception& e) { log() << "warning: write to lsn file failed " << e.what() << endl; // keep running (ignore the error). recovery will be slow. } } - void Journal::preFlush() { - j._preFlushTime = Listener::getElapsedTimeMillis(); + namespace { + SimpleMutex lastGeneratedSeqNumberMutex("lastGeneratedSeqNumberMutex"); + uint64_t lastGeneratedSeqNumber = 0; + } + + uint64_t generateNextSeqNumber() { + const uint64_t now = Listener::getElapsedTimeMillis(); + SimpleMutex::scoped_lock lock(lastGeneratedSeqNumberMutex); + if (now > lastGeneratedSeqNumber) { + lastGeneratedSeqNumber = now; + } else { + // Make sure we return unique monotonically increasing numbers. + lastGeneratedSeqNumber++; + } + return lastGeneratedSeqNumber; + } + + void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber) { + j._lastSeqNumberWrittenToSharedView.store(seqNumber); + } + + void notifyPreDataFileFlush() { + j._preFlushTime.store(j._lastSeqNumberWrittenToSharedView.load()); } - void Journal::postFlush() { - j._lastFlushTime = j._preFlushTime; - j._writeToLSNNeeded = true; + void notifyPostDataFileFlush() { + j._lastFlushTime.store(j._preFlushTime.load()); + j._writeToLSNNeeded.store(true); } // call from within _curLogFileMutex @@ -654,7 +682,7 @@ namespace mongo { JFile jf; jf.filename = _curLogFile->_name; - jf.lastEventTimeMs = Listener::getElapsedTimeMillis(); + jf.lastEventTimeMs = generateNextSeqNumber(); _oldJournalFiles.push_back(jf); delete _curLogFile; // close @@ -669,11 +697,11 @@ namespace mongo { while( !_oldJournalFiles.empty() ) { JFile f = _oldJournalFiles.front(); - // 'f.lastEventTimeMs' is the timestamp of the last thing in the journal file. - // '_lastFlushTime' is the start time of the last successful flush of the data - // files to disk. We can't delete this journal file until the last successful - // flush time is at least 10 seconds after 'f.lastEventTimeMs'. - if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime) { + // 'f.lastEventTimeMs' is the timestamp of the last thing in the journal file. + // '_lastFlushTime' is the start time of the last successful flush of the data + // files to disk. We can't delete this journal file until the last successful + // flush time is at least 10 seconds after 'f.lastEventTimeMs'. + if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime.load()) { // eligible for deletion boost::filesystem::path p( f.filename ); log() << "old journal file will be removed: " << f.filename << endl; @@ -687,7 +715,7 @@ namespace mongo { } } - void Journal::_rotate() { + void Journal::_rotate(unsigned long long lsnOfCurrentJournalEntry) { RACECHECK; @@ -696,7 +724,7 @@ namespace mongo { if ( inShutdown() || !_curLogFile ) return; - j.updateLSNFile(); + j.updateLSNFile(lsnOfCurrentJournalEntry); if( _curLogFile && _written < DataLimitPerJournalFile ) return; @@ -785,7 +813,7 @@ namespace mongo { verify( w <= L ); stats.curr->_journaledBytes += L; _curLogFile->synchronousAppend((const void *) b.buf(), L); - _rotate(); + _rotate(h.seqNumber); } catch(std::exception& e) { log() << "error exception in dur::journal " << e.what() << endl; diff --git a/src/mongo/db/dur_journal.h b/src/mongo/db/dur_journal.h index 9e7fb5bf4b6..f595f05c3ee 100644 --- a/src/mongo/db/dur_journal.h +++ b/src/mongo/db/dur_journal.h @@ -30,6 +30,8 @@ #pragma once +#include <mongo/platform/cstdint.h> + namespace mongo { class AlignedBuilder; @@ -49,12 +51,17 @@ namespace mongo { /** assure journal/ dir exists. throws */ void journalMakeDir(); - /** check if time to rotate files; assure a file is open. - done separately from the journal() call as we can do this part - outside of lock. - only called by durThread. + /** + * Generates the next sequence number for use in the journal, guaranteed to be greater than all + * prior sequence numbers. + */ + uint64_t generateNextSeqNumber(); + + /** + * Informs the journaling system that all writes on or before the passed in sequence number have + * been written to the data files' shared mmap view. */ - void journalRotate(); + void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber); /** flag that something has gone wrong during writing to the journal (not for recovery mode) @@ -64,8 +71,6 @@ namespace mongo { /** read lsn from disk from the last run before doing recovery */ unsigned long long journalReadLSN(); - unsigned long long getLastDataFileFlushTime(); - /** never throws. @param anyFiles by default we only look at j._* files. If anyFiles is true, return true if there are any files in the journal directory. acquirePathLock() uses this to @@ -79,5 +84,11 @@ namespace mongo { const unsigned JournalCommitIntervalDefault = 100; + /** + * Call these before (pre) and after (post) the datafiles are flushed to disk by the DataFileSync + * thread. These should not be called for any other flushes. + */ + void notifyPreDataFileFlush(); + void notifyPostDataFileFlush(); } } diff --git a/src/mongo/db/dur_journalimpl.h b/src/mongo/db/dur_journalimpl.h index bafc0570f19..0c12a6677d5 100644 --- a/src/mongo/db/dur_journalimpl.h +++ b/src/mongo/db/dur_journalimpl.h @@ -30,7 +30,12 @@ #pragma once +#include <boost/filesystem/path.hpp> + #include "mongo/db/dur_journalformat.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/util/alignedbuilder.h" +#include "mongo/util/concurrency/mutex.h" #include "mongo/util/logfile.h" namespace mongo { @@ -59,7 +64,6 @@ namespace mongo { boost::filesystem::path getFilePathFor(int filenumber) const; - unsigned long long lastFlushTime() const { return _lastFlushTime; } void cleanup(bool log); // closes and removes journal files unsigned long long curFileId() const { return _curFileId; } @@ -77,7 +81,7 @@ namespace mongo { /** check if time to rotate files. assure a file is open. * internally called with every commit */ - void _rotate(); + void _rotate(unsigned long long lsnOfCurrentJournalEntry); void _open(); void closeCurrentJournalFile(); @@ -101,12 +105,17 @@ namespace mongo { list<JFile> _oldJournalFiles; // use _curLogFileMutex // lsn related - static void preFlush(); - static void postFlush(); - unsigned long long _preFlushTime; - unsigned long long _lastFlushTime; // data < this time is fsynced in the datafiles (unless hard drive controller is caching) - bool _writeToLSNNeeded; - void updateLSNFile(); + friend void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber); + friend void notifyPreDataFileFlush(); + friend void notifyPostDataFileFlush(); + void updateLSNFile(unsigned long long lsnOfCurrentJournalEntry); + // data <= this time is in the shared view + AtomicUInt64 _lastSeqNumberWrittenToSharedView; + // data <= this time was in the shared view when the last flush to start started + AtomicUInt64 _preFlushTime; + // data <= this time is fsynced in the datafiles (unless hard drive controller is caching) + AtomicUInt64 _lastFlushTime; + AtomicInt32 _writeToLSNNeeded; }; } diff --git a/src/mongo/db/dur_preplogbuffer.cpp b/src/mongo/db/dur_preplogbuffer.cpp index 9686c21a526..c2f3b09adfa 100644 --- a/src/mongo/db/dur_preplogbuffer.cpp +++ b/src/mongo/db/dur_preplogbuffer.cpp @@ -168,7 +168,7 @@ namespace mongo { bb.reset(); h.setSectionLen(0xffffffff); // total length, will fill in later - h.seqNumber = getLastDataFileFlushTime(); + h.seqNumber = generateNextSeqNumber(); h.fileId = j.curFileId(); } diff --git a/src/mongo/db/dur_recover.cpp b/src/mongo/db/dur_recover.cpp index 2e972b45d86..cd177222760 100644 --- a/src/mongo/db/dur_recover.cpp +++ b/src/mongo/db/dur_recover.cpp @@ -385,27 +385,46 @@ namespace mongo { scoped_lock lk(_mx); RACECHECK - // Check the footer checksum before doing anything else. if (_recovering) { + // Check the footer checksum before doing anything else. verify( ((const char *)h) + sizeof(JSectHeader) == p ); if (!f->checkHash(h, len + sizeof(JSectHeader))) { log() << "journal section checksum doesn't match"; throw JournalSectionCorruptException(); } - } - if( _recovering && _lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs ) { - if( h->seqNumber != _lastSeqMentionedInConsoleLog ) { - static int n; - if( ++n < 10 ) { - log() << "recover skipping application of section seq:" << h->seqNumber << " < lsn:" << _lastDataSyncedFromLastRun << endl; + static uint64_t numJournalSegmentsSkipped = 0; + static const uint64_t kMaxSkippedSectionsToLog = 10; + if (_lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) { + if (_appliedAnySections) { + severe() << "Journal section sequence number " << h->seqNumber + << " is lower than the threshold for applying (" + << h->seqNumber + ExtraKeepTimeMs + << ") but we have already applied some journal sections. " + << "This implies a corrupt journal file."; + fassertFailed(34369); } - else if( n == 10 ) { + + if (++numJournalSegmentsSkipped < kMaxSkippedSectionsToLog) { + log() << "recover skipping application of section seq:" << h->seqNumber + << " < lsn:" << _lastDataSyncedFromLastRun << endl; + } else if (numJournalSegmentsSkipped == kMaxSkippedSectionsToLog) { log() << "recover skipping application of section more..." << endl; } - _lastSeqMentionedInConsoleLog = h->seqNumber; + _lastSeqSkipped = h->seqNumber; + return; + } + + if (!_appliedAnySections) { + _appliedAnySections = true; + if (numJournalSegmentsSkipped >= kMaxSkippedSectionsToLog) { + // Log the last skipped section's sequence number if it hasn't been logged before. + log() << "recover final skipped journal section had sequence number " + << _lastSeqSkipped; + } + log() << "recover applying initial journal section with sequence number " + << h->seqNumber; } - return; } auto_ptr<JournalSectionIterator> i; @@ -553,6 +572,12 @@ namespace mongo { } } + if (_lastSeqSkipped && !_appliedAnySections) { + log() << "recover journal replay completed without applying any sections. " + << "This can happen if there were no writes after the last fsync of the data " + << "files. Last skipped sections had sequence number " << _lastSeqSkipped; + } + close(); if (storageGlobalParams.durOptions & StorageGlobalParams::DurScanOnly) { diff --git a/src/mongo/db/dur_recover.h b/src/mongo/db/dur_recover.h index b36c7ea6562..9ddb2550a5a 100644 --- a/src/mongo/db/dur_recover.h +++ b/src/mongo/db/dur_recover.h @@ -57,7 +57,11 @@ namespace mongo { } last; public: RecoveryJob() : _lastDataSyncedFromLastRun(0), - _mx("recovery"), _recovering(false) { _lastSeqMentionedInConsoleLog = 1; } + _lastSeqSkipped(0), + _appliedAnySections(false), + _mx("recovery"), + _recovering(false) {} + void go(vector<boost::filesystem::path>& files); ~RecoveryJob(); @@ -79,7 +83,8 @@ namespace mongo { list<boost::shared_ptr<DurableMappedFile> > _mmfs; unsigned long long _lastDataSyncedFromLastRun; - unsigned long long _lastSeqMentionedInConsoleLog; + unsigned long long _lastSeqSkipped; + bool _appliedAnySections; public: mongo::mutex _mx; // protects _mmfs private: diff --git a/src/mongo/db/dur_writetodatafiles.cpp b/src/mongo/db/dur_writetodatafiles.cpp index d9a2e345115..6d823c7aa07 100644 --- a/src/mongo/db/dur_writetodatafiles.cpp +++ b/src/mongo/db/dur_writetodatafiles.cpp @@ -31,6 +31,7 @@ #include "mongo/pch.h" #include "mongo/db/dur_commitjob.h" +#include "mongo/db/dur_journal.h" #include "mongo/db/dur_recover.h" #include "mongo/db/dur_stats.h" #include "mongo/util/concurrency/mutex.h" @@ -100,6 +101,7 @@ namespace mongo { WRITETODATAFILES_Impl1(h, uncompressed); unsigned long long m = t.micros(); stats.curr->_writeToDataFilesMicros += m; + setLastSeqNumberWrittenToSharedView(h.seqNumber); LOG(2) << "journal WRITETODATAFILES " << m / 1000.0 << "ms" << endl; } diff --git a/src/mongo/util/mmap.cpp b/src/mongo/util/mmap.cpp index b4b9435a7f5..f021cd55022 100644 --- a/src/mongo/util/mmap.cpp +++ b/src/mongo/util/mmap.cpp @@ -138,17 +138,8 @@ namespace { return total; } - void nullFunc() { } - - // callback notifications - void (*MongoFile::notifyPreFlush)() = nullFunc; - void (*MongoFile::notifyPostFlush)() = nullFunc; - /*static*/ int MongoFile::flushAll( bool sync ) { - if ( sync ) notifyPreFlush(); - int x = _flushAll(sync); - if ( sync ) notifyPostFlush(); - return x; + return _flushAll(sync); } /*static*/ int MongoFile::_flushAll( bool sync ) { diff --git a/src/mongo/util/mmap.h b/src/mongo/util/mmap.h index 9d24da27c9a..33df258ad28 100644 --- a/src/mongo/util/mmap.h +++ b/src/mongo/util/mmap.h @@ -102,10 +102,6 @@ namespace mongo { */ static std::set<MongoFile*>& getAllFiles(); - // callbacks if you need them - static void (*notifyPreFlush)(); - static void (*notifyPostFlush)(); - static int flushAll( bool sync ); // returns n flushed static long long totalMappedLength(); static void closeAllFiles( std::stringstream &message ); |