diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/storage/mmap_v1/data_file_sync.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_journal.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_journal.h | 30 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_journalimpl.h | 32 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_recover.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_recover.h | 3 |
8 files changed, 138 insertions, 60 deletions
diff --git a/src/mongo/db/storage/mmap_v1/data_file_sync.cpp b/src/mongo/db/storage/mmap_v1/data_file_sync.cpp index 2c9830595fc..fa80c2942f7 100644 --- a/src/mongo/db/storage/mmap_v1/data_file_sync.cpp +++ b/src/mongo/db/storage/mmap_v1/data_file_sync.cpp @@ -35,6 +35,7 @@ #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/global_environment_experiment.h" #include "mongo/db/instance.h" +#include "mongo/db/storage/mmap_v1/dur_journal.h" #include "mongo/db/storage/mmap_v1/mmap_v1_options.h" #include "mongo/db/storage_options.h" #include "mongo/util/exit.h" @@ -80,7 +81,9 @@ void DataFileSync::run() { Date_t start = jsTime(); StorageEngine* storageEngine = getGlobalEnvironment()->getGlobalStorageEngine(); + dur::notifyPreDataFileFlush(); int numFiles = storageEngine->flushAllFiles(true); + dur::notifyPostDataFileFlush(); time_flushing = (int)(jsTime() - start); _flushed(time_flushing); diff --git a/src/mongo/db/storage/mmap_v1/dur_journal.cpp b/src/mongo/db/storage/mmap_v1/dur_journal.cpp index 2888cbdec63..14581b15a01 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journal.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_journal.cpp @@ -197,9 +197,10 @@ Journal::Journal() : _curLogFileMutex("JournalLfMutex") { _nextFileNumber = 0; _curLogFile = 0; _curFileId = 0; - _preFlushTime = 0; - _lastFlushTime = 0; - _writeToLSNNeeded = false; + _lastSeqNumberWrittenToSharedView.store(0); + _preFlushTime.store(0); + _lastFlushTime.store(0); + _writeToLSNNeeded.store(false); } boost::filesystem::path Journal::getFilePathFor(int filenumber) const { @@ -548,12 +549,9 @@ void Journal::_open() { void Journal::init() { verify(_curLogFile == 0); - MongoFile::notifyPreFlush = preFlush; - MongoFile::notifyPostFlush = postFlush; } void Journal::open() { - verify(MongoFile::notifyPreFlush == preFlush); SimpleMutex::scoped_lock lk(_curLogFileMutex); _open(); } @@ -610,18 +608,28 @@ unsigned long long journalReadLSN() { return 0; } -unsigned long long getLastDataFileFlushTime() { - return j.lastFlushTime(); -} - /** remember "last sequence number" to speed recoveries concurrency: called by durThread only. */ -void Journal::updateLSNFile() { - if (!_writeToLSNNeeded) +void Journal::updateLSNFile(unsigned long long lsnOfCurrentJournalEntry) { + if (!_writeToLSNNeeded.load()) return; - _writeToLSNNeeded = false; + _writeToLSNNeeded.store(false); try { + // Don't read from _lastFlushTime again in this function since it may change. + const uint64_t copyOfLastFlushTime = _lastFlushTime.load(); + + // Only write an LSN that is older than the journal entry we are in the middle of writing. + // If this trips, it means that _lastFlushTime got ahead of what is actually in the data + // files because lsnOfCurrentJournalEntry includes data that hasn't yet been written to the + // data files. + if (copyOfLastFlushTime >= lsnOfCurrentJournalEntry) { + severe() << "Attempting to update LSNFile to " << copyOfLastFlushTime + << " which is not older than the current journal sequence number " + << lsnOfCurrentJournalEntry; + fassertFailed(34370); + } + // os can flush as it likes. if it flushes slowly, we will just do extra work on recovery. // however, given we actually close the file, that seems unlikely. File f; @@ -631,9 +639,9 @@ void Journal::updateLSNFile() { log() << "warning: open of lsn file failed" << endl; return; } - LOG(1) << "lsn set " << _lastFlushTime << endl; + LOG(1) << "lsn set " << copyOfLastFlushTime << endl; LSNFile lsnf; - lsnf.set(_lastFlushTime); + lsnf.set(copyOfLastFlushTime); f.write(0, (char*)&lsnf, sizeof(lsnf)); // do we want to fsync here? if we do it probably needs to be async so the durthread // is not delayed. @@ -643,13 +651,34 @@ void Journal::updateLSNFile() { } } -void Journal::preFlush() { - j._preFlushTime = Listener::getElapsedTimeMillis(); +namespace { +SimpleMutex lastGeneratedSeqNumberMutex("lastGeneratedSeqNumberMutex"); +uint64_t lastGeneratedSeqNumber = 0; +} + +uint64_t generateNextSeqNumber() { + const uint64_t now = Listener::getElapsedTimeMillis(); + SimpleMutex::scoped_lock lock(lastGeneratedSeqNumberMutex); + if (now > lastGeneratedSeqNumber) { + lastGeneratedSeqNumber = now; + } else { + // Make sure we return unique monotonically increasing numbers. + lastGeneratedSeqNumber++; + } + return lastGeneratedSeqNumber; } -void Journal::postFlush() { - j._lastFlushTime = j._preFlushTime; - j._writeToLSNNeeded = true; +void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber) { + j._lastSeqNumberWrittenToSharedView.store(seqNumber); +} + +void notifyPreDataFileFlush() { + j._preFlushTime.store(j._lastSeqNumberWrittenToSharedView.load()); +} + +void notifyPostDataFileFlush() { + j._lastFlushTime.store(j._preFlushTime.load()); + j._writeToLSNNeeded.store(true); } // call from within _curLogFileMutex @@ -659,7 +688,7 @@ void Journal::closeCurrentJournalFile() { JFile jf; jf.filename = _curLogFile->_name; - jf.lastEventTimeMs = Listener::getElapsedTimeMillis(); + jf.lastEventTimeMs = generateNextSeqNumber(); _oldJournalFiles.push_back(jf); delete _curLogFile; // close @@ -678,7 +707,7 @@ void Journal::removeUnneededJournalFiles() { // '_lastFlushTime' is the start time of the last successful flush of the data files to // disk. We can't delete this journal file until the last successful flush time is at least // 10 seconds after 'f.lastEventTimeMs'. - if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime) { + if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime.load()) { // eligible for deletion boost::filesystem::path p(f.filename); log() << "old journal file will be removed: " << f.filename << endl; @@ -691,13 +720,12 @@ void Journal::removeUnneededJournalFiles() { } } -void Journal::_rotate() { +void Journal::_rotate(unsigned long long lsnOfCurrentJournalEntry) { _curLogFileMutex.dassertLocked(); - if (inShutdown() || !_curLogFile) return; - j.updateLSNFile(); + j.updateLSNFile(lsnOfCurrentJournalEntry); if (_curLogFile && _written < DataLimitPerJournalFile) return; @@ -785,7 +813,7 @@ void Journal::journal(const JSectHeader& h, const AlignedBuilder& uncompressed) verify(w <= L); stats.curr()->_journaledBytes += L; _curLogFile->synchronousAppend((const void*)b.buf(), L); - _rotate(); + _rotate(h.seqNumber); } catch (std::exception& e) { log() << "error exception in dur::journal " << e.what() << endl; throw; diff --git a/src/mongo/db/storage/mmap_v1/dur_journal.h b/src/mongo/db/storage/mmap_v1/dur_journal.h index 07def586090..68e3572f5a3 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journal.h +++ b/src/mongo/db/storage/mmap_v1/dur_journal.h @@ -30,6 +30,8 @@ #pragma once +#include <cstdint> + namespace mongo { class AlignedBuilder; @@ -51,12 +53,17 @@ void journalCleanup(bool log = false); /** assure journal/ dir exists. throws */ void journalMakeDir(); -/** check if time to rotate files; assure a file is open. - done separately from the journal() call as we can do this part - outside of lock. - only called by durThread. +/** + * Generates the next sequence number for use in the journal, guaranteed to be greater than all + * prior sequence numbers. + */ +uint64_t generateNextSeqNumber(); + +/** + * Informs the journaling system that all writes on or before the passed in sequence number have + * been written to the data files' shared mmap view. */ -void journalRotate(); +void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber); /** flag that something has gone wrong during writing to the journal (not for recovery mode) @@ -66,8 +73,6 @@ void journalingFailure(const char* msg); /** read lsn from disk from the last run before doing recovery */ unsigned long long journalReadLSN(); -unsigned long long getLastDataFileFlushTime(); - /** never throws. @param anyFiles by default we only look at j._* files. If anyFiles is true, return true if there are any files in the journal directory. acquirePathLock() uses this to @@ -85,5 +90,12 @@ void WRITETOJOURNAL(const JSectHeader& h, const AlignedBuilder& uncompressed); const long long ExtraKeepTimeMs = 10000; const unsigned JournalCommitIntervalDefault = 100; -} -} + +/** + * Call these before (pre) and after (post) the datafiles are flushed to disk by the DataFileSync + * thread. These should not be called for any other flushes. + */ +void notifyPreDataFileFlush(); +void notifyPostDataFileFlush(); +} // namespace dur +} // namespace mongo diff --git a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp index 50fe9d28aa1..8b231555202 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp @@ -65,6 +65,8 @@ void WRITETODATAFILES(const JSectHeader& h, const AlignedBuilder& uncompressed) const long long m = t.micros(); stats.curr()->_writeToDataFilesMicros += m; + setLastSeqNumberWrittenToSharedView(h.seqNumber); + LOG(4) << "journal WRITETODATAFILES " << m / 1000.0 << "ms"; } diff --git a/src/mongo/db/storage/mmap_v1/dur_journalimpl.h b/src/mongo/db/storage/mmap_v1/dur_journalimpl.h index e51608b69e4..6a47753e977 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journalimpl.h +++ b/src/mongo/db/storage/mmap_v1/dur_journalimpl.h @@ -30,7 +30,12 @@ #pragma once +#include <boost/filesystem/path.hpp> + +#include "mongo/db/storage/mmap_v1/aligned_builder.h" #include "mongo/db/storage/mmap_v1/dur_journalformat.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/util/concurrency/mutex.h" #include "mongo/util/logfile.h" namespace mongo { @@ -59,9 +64,6 @@ public: boost::filesystem::path getFilePathFor(int filenumber) const; - unsigned long long lastFlushTime() const { - return _lastFlushTime; - } void cleanup(bool log); // closes and removes journal files unsigned long long curFileId() const { @@ -81,14 +83,14 @@ private: /** check if time to rotate files. assure a file is open. * internally called with every commit */ - void _rotate(); + void _rotate(unsigned long long lsnOfCurrentJournalEntry); void _open(); void closeCurrentJournalFile(); void removeUnneededJournalFiles(); - unsigned long long _written; // bytes written so far to the current journal (log) file - unsigned _nextFileNumber; + unsigned long long _written = 0; // bytes written so far to the current journal (log) file + unsigned _nextFileNumber = 0; SimpleMutex _curLogFileMutex; @@ -105,13 +107,17 @@ private: std::list<JFile> _oldJournalFiles; // use _curLogFileMutex // lsn related - static void preFlush(); - static void postFlush(); - unsigned long long _preFlushTime; - // data < this time is fsynced in the datafiles (unless hard drive controller is caching) - unsigned long long _lastFlushTime; - bool _writeToLSNNeeded; - void updateLSNFile(); + friend void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber); + friend void notifyPreDataFileFlush(); + friend void notifyPostDataFileFlush(); + void updateLSNFile(unsigned long long lsnOfCurrentJournalEntry); + // data <= this time is in the shared view + AtomicUInt64 _lastSeqNumberWrittenToSharedView; + // data <= this time was in the shared view when the last flush to start started + AtomicUInt64 _preFlushTime; + // data <= this time is fsynced in the datafiles (unless hard drive controller is caching) + AtomicUInt64 _lastFlushTime; + AtomicWord<bool> _writeToLSNNeeded; }; } } diff --git a/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp b/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp index ea3b4e85148..d1ecdba5b99 100644 --- a/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp @@ -175,7 +175,7 @@ static void _PREPLOGBUFFER(JSectHeader& h, AlignedBuilder& bb) { // Invalidate the total length, we will fill it in later. h.setSectionLen(0xffffffff); - h.seqNumber = getLastDataFileFlushTime(); + h.seqNumber = generateNextSeqNumber(); h.fileId = j.curFileId(); // Ops other than basic writes (DurOp's) go first diff --git a/src/mongo/db/storage/mmap_v1/dur_recover.cpp b/src/mongo/db/storage/mmap_v1/dur_recover.cpp index c37fbd23ef7..d0e461ed0d7 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recover.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_recover.cpp @@ -255,7 +255,8 @@ RecoveryJob::RecoveryJob() : _mx("recovery"), _recovering(false), _lastDataSyncedFromLastRun(0), - _lastSeqMentionedInConsoleLog(1) {} + _lastSeqSkipped(0), + _appliedAnySections(false) {} RecoveryJob::~RecoveryJob() { DESTRUCTOR_GUARD(if (!_mmfs.empty()) {} close();) @@ -384,27 +385,46 @@ void RecoveryJob::processSection(const JSectHeader* h, LockMongoFilesShared lkFiles; // for RecoveryJob::Last scoped_lock lk(_mx); - // Check the footer checksum before doing anything else. if (_recovering) { + // Check the footer checksum before doing anything else. verify(((const char*)h) + sizeof(JSectHeader) == p); if (!f->checkHash(h, len + sizeof(JSectHeader))) { log() << "journal section checksum doesn't match"; throw JournalSectionCorruptException(); } - } - if (_recovering && _lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) { - if (h->seqNumber != _lastSeqMentionedInConsoleLog) { - static int n; - if (++n < 10) { + static uint64_t numJournalSegmentsSkipped = 0; + static const uint64_t kMaxSkippedSectionsToLog = 10; + if (_lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) { + if (_appliedAnySections) { + severe() << "Journal section sequence number " << h->seqNumber + << " is lower than the threshold for applying (" + << h->seqNumber + ExtraKeepTimeMs + << ") but we have already applied some journal sections. This implies a " + << "corrupt journal file."; + fassertFailed(34369); + } + + if (++numJournalSegmentsSkipped < kMaxSkippedSectionsToLog) { log() << "recover skipping application of section seq:" << h->seqNumber << " < lsn:" << _lastDataSyncedFromLastRun << endl; - } else if (n == 10) { + } else if (numJournalSegmentsSkipped == kMaxSkippedSectionsToLog) { log() << "recover skipping application of section more..." << endl; } - _lastSeqMentionedInConsoleLog = h->seqNumber; + _lastSeqSkipped = h->seqNumber; + return; + } + + if (!_appliedAnySections) { + _appliedAnySections = true; + if (numJournalSegmentsSkipped >= kMaxSkippedSectionsToLog) { + // Log the last skipped section's sequence number if it hasn't been logged before. + log() << "recover final skipped journal section had sequence number " + << _lastSeqSkipped; + } + log() << "recover applying initial journal section with sequence number " + << h->seqNumber; } - return; } auto_ptr<JournalSectionIterator> i; @@ -553,6 +573,12 @@ void RecoveryJob::go(vector<boost::filesystem::path>& files) { } } + if (_lastSeqSkipped && !_appliedAnySections) { + log() << "recover journal replay completed without applying any sections. " + << "This can happen if there were no writes after the last fsync of the data files. " + << "Last skipped sections had sequence number " << _lastSeqSkipped; + } + close(); if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalScanOnly) { diff --git a/src/mongo/db/storage/mmap_v1/dur_recover.h b/src/mongo/db/storage/mmap_v1/dur_recover.h index a9f20f2e8cf..5c22249363d 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recover.h +++ b/src/mongo/db/storage/mmap_v1/dur_recover.h @@ -95,7 +95,8 @@ private: bool _recovering; unsigned long long _lastDataSyncedFromLastRun; - unsigned long long _lastSeqMentionedInConsoleLog; + unsigned long long _lastSeqSkipped; + bool _appliedAnySections; static RecoveryJob& _instance; |