diff options
-rw-r--r-- | jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/data_file_sync.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_journal.cpp | 79 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_journal.h | 26 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_journalimpl.h | 32 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_recover.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_recover.h | 3 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/mmap.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/mmap.h | 4 |
11 files changed, 142 insertions, 74 deletions
diff --git a/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js b/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js index ee25b5874b5..c97d8320422 100644 --- a/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js +++ b/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js @@ -33,7 +33,7 @@ // Set up replica set var replTest = new ReplSetTest({ name: 'bgIndexNoRetry', nodes: 3, - nodeOptions : {noIndexBuildRetry:""} }); + nodeOptions : {noIndexBuildRetry:"", syncdelay:1} }); var nodenames = replTest.nodeList(); // We can't use an arbiter as the third node because the -auth test tries to log on there diff --git a/src/mongo/db/storage/mmap_v1/data_file_sync.cpp b/src/mongo/db/storage/mmap_v1/data_file_sync.cpp index cf33e24d008..4fafae825ea 100644 --- a/src/mongo/db/storage/mmap_v1/data_file_sync.cpp +++ b/src/mongo/db/storage/mmap_v1/data_file_sync.cpp @@ -36,6 +36,7 @@ #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/service_context.h" #include "mongo/db/instance.h" +#include "mongo/db/storage/mmap_v1/dur_journal.h" #include "mongo/db/storage/mmap_v1/mmap.h" #include "mongo/db/storage/mmap_v1/mmap_v1_options.h" #include "mongo/db/storage/storage_options.h" @@ -81,7 +82,11 @@ void DataFileSync::run() { Date_t start = jsTime(); StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); + + dur::notifyPreDataFileFlush(); int numFiles = storageEngine->flushAllFiles(true); + dur::notifyPostDataFileFlush(); + time_flushing = durationCount<Milliseconds>(jsTime() - start); _flushed(time_flushing); diff --git a/src/mongo/db/storage/mmap_v1/dur_journal.cpp b/src/mongo/db/storage/mmap_v1/dur_journal.cpp index 04b5182544f..14f329e0931 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journal.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_journal.cpp @@ -198,9 +198,10 @@ Journal::Journal() { _nextFileNumber = 0; _curLogFile = 0; _curFileId = 0; - _preFlushTime = 0; - _lastFlushTime = 0; - _writeToLSNNeeded = false; + _lastSeqNumberWrittenToSharedView.store(0); + _preFlushTime.store(0); + _lastFlushTime.store(0); + _writeToLSNNeeded.store(false); } boost::filesystem::path Journal::getFilePathFor(int filenumber) const { @@ -550,12 +551,9 @@ void Journal::_open() { void Journal::init() { verify(_curLogFile == 0); - MongoFile::notifyPreFlush = preFlush; - MongoFile::notifyPostFlush = postFlush; } void Journal::open() { - verify(MongoFile::notifyPreFlush == preFlush); stdx::lock_guard<SimpleMutex> lk(_curLogFileMutex); _open(); } @@ -612,18 +610,28 @@ unsigned long long journalReadLSN() { return 0; } -unsigned long long getLastDataFileFlushTime() { - return j.lastFlushTime(); -} - /** remember "last sequence number" to speed recoveries concurrency: called by durThread only. */ -void Journal::updateLSNFile() { - if (!_writeToLSNNeeded) +void Journal::updateLSNFile(unsigned long long lsnOfCurrentJournalEntry) { + if (!_writeToLSNNeeded.load()) return; - _writeToLSNNeeded = false; + _writeToLSNNeeded.store(false); try { + // Don't read from _lastFlushTime again in this function since it may change. + const uint64_t copyOfLastFlushTime = _lastFlushTime.load(); + + // Only write an LSN that is older than the journal entry we are in the middle of writing. + // If this trips, it means that _lastFlushTime got ahead of what is actually in the data + // files because lsnOfCurrentJournalEntry includes data that hasn't yet been written to the + // data files. + if (copyOfLastFlushTime >= lsnOfCurrentJournalEntry) { + severe() << "Attempting to update LSNFile to " << copyOfLastFlushTime + << " which is not older than the current journal sequence number " + << lsnOfCurrentJournalEntry; + fassertFailed(34370); + } + // os can flush as it likes. if it flushes slowly, we will just do extra work on recovery. // however, given we actually close the file, that seems unlikely. File f; @@ -633,9 +641,9 @@ void Journal::updateLSNFile() { log() << "warning: open of lsn file failed" << endl; return; } - LOG(1) << "lsn set " << _lastFlushTime << endl; + LOG(1) << "lsn set " << copyOfLastFlushTime << endl; LSNFile lsnf; - lsnf.set(_lastFlushTime); + lsnf.set(copyOfLastFlushTime); f.write(0, (char*)&lsnf, sizeof(lsnf)); // do we want to fsync here? if we do it probably needs to be async so the durthread // is not delayed. @@ -645,13 +653,34 @@ void Journal::updateLSNFile() { } } -void Journal::preFlush() { - j._preFlushTime = Listener::getElapsedTimeMillis(); +namespace { +stdx::mutex lastGeneratedSeqNumberMutex; +uint64_t lastGeneratedSeqNumber = 0; +} + +uint64_t generateNextSeqNumber() { + const uint64_t now = Listener::getElapsedTimeMillis(); + stdx::lock_guard<stdx::mutex> lock(lastGeneratedSeqNumberMutex); + if (now > lastGeneratedSeqNumber) { + lastGeneratedSeqNumber = now; + } else { + // Make sure we return unique monotonically increasing numbers. + lastGeneratedSeqNumber++; + } + return lastGeneratedSeqNumber; +} + +void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber) { + j._lastSeqNumberWrittenToSharedView.store(seqNumber); +} + +void notifyPreDataFileFlush() { + j._preFlushTime.store(j._lastSeqNumberWrittenToSharedView.load()); } -void Journal::postFlush() { - j._lastFlushTime = j._preFlushTime; - j._writeToLSNNeeded = true; +void notifyPostDataFileFlush() { + j._lastFlushTime.store(j._preFlushTime.load()); + j._writeToLSNNeeded.store(true); } // call from within _curLogFileMutex @@ -661,7 +690,7 @@ void Journal::closeCurrentJournalFile() { JFile jf; jf.filename = _curLogFile->_name; - jf.lastEventTimeMs = Listener::getElapsedTimeMillis(); + jf.lastEventTimeMs = generateNextSeqNumber(); _oldJournalFiles.push_back(jf); delete _curLogFile; // close @@ -680,7 +709,7 @@ void Journal::removeUnneededJournalFiles() { // '_lastFlushTime' is the start time of the last successful flush of the data files to // disk. We can't delete this journal file until the last successful flush time is at least // 10 seconds after 'f.lastEventTimeMs'. - if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime) { + if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime.load()) { // eligible for deletion boost::filesystem::path p(f.filename); log() << "old journal file will be removed: " << f.filename << endl; @@ -693,11 +722,11 @@ void Journal::removeUnneededJournalFiles() { } } -void Journal::_rotate() { +void Journal::_rotate(unsigned long long lsnOfCurrentJournalEntry) { if (inShutdown() || !_curLogFile) return; - j.updateLSNFile(); + j.updateLSNFile(lsnOfCurrentJournalEntry); if (_curLogFile && _written < DataLimitPerJournalFile) return; @@ -785,7 +814,7 @@ void Journal::journal(const JSectHeader& h, const AlignedBuilder& uncompressed) verify(w <= L); stats.curr()->_journaledBytes += L; _curLogFile->synchronousAppend((const void*)b.buf(), L); - _rotate(); + _rotate(h.seqNumber); } catch (std::exception& e) { log() << "error exception in dur::journal " << e.what() << endl; throw; diff --git a/src/mongo/db/storage/mmap_v1/dur_journal.h b/src/mongo/db/storage/mmap_v1/dur_journal.h index c9911b99507..e88ea7dabb5 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journal.h +++ b/src/mongo/db/storage/mmap_v1/dur_journal.h @@ -30,6 +30,8 @@ #pragma once +#include <cstdint> + namespace mongo { class AlignedBuilder; @@ -51,12 +53,17 @@ void journalCleanup(bool log = false); /** assure journal/ dir exists. throws */ void journalMakeDir(); -/** check if time to rotate files; assure a file is open. - done separately from the journal() call as we can do this part - outside of lock. - only called by durThread. +/** + * Generates the next sequence number for use in the journal, guaranteed to be greater than all + * prior sequence numbers. + */ +uint64_t generateNextSeqNumber(); + +/** + * Informs the journaling system that all writes on or before the passed in sequence number have + * been written to the data files' shared mmap view. */ -void journalRotate(); +void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber); /** flag that something has gone wrong during writing to the journal (not for recovery mode) @@ -66,8 +73,6 @@ void journalingFailure(const char* msg); /** read lsn from disk from the last run before doing recovery */ unsigned long long journalReadLSN(); -unsigned long long getLastDataFileFlushTime(); - /** never throws. @param anyFiles by default we only look at j._* files. If anyFiles is true, return true if there are any files in the journal directory. acquirePathLock() uses this to @@ -83,5 +88,12 @@ void WRITETOJOURNAL(const JSectHeader& h, const AlignedBuilder& uncompressed); // in case disk controller buffers writes const long long ExtraKeepTimeMs = 10000; + +/** + * Call these before (pre) and after (post) the datafiles are flushed to disk by the DataFileSync + * thread. These should not be called for any other flushes. + */ +void notifyPreDataFileFlush(); +void notifyPostDataFileFlush(); } // namespace dur } // namespace mongo diff --git a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp index 78b40e25313..9670aa80300 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp @@ -64,6 +64,8 @@ void WRITETODATAFILES(const JSectHeader& h, const AlignedBuilder& uncompressed) const long long m = t.micros(); stats.curr()->_writeToDataFilesMicros += m; + setLastSeqNumberWrittenToSharedView(h.seqNumber); + LOG(4) << "journal WRITETODATAFILES " << m / 1000.0 << "ms"; } diff --git a/src/mongo/db/storage/mmap_v1/dur_journalimpl.h b/src/mongo/db/storage/mmap_v1/dur_journalimpl.h index 77e79ccb8d1..62d28db7036 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journalimpl.h +++ b/src/mongo/db/storage/mmap_v1/dur_journalimpl.h @@ -30,8 +30,13 @@ #pragma once +#include <boost/filesystem/path.hpp> + +#include "mongo/db/storage/mmap_v1/aligned_builder.h" #include "mongo/db/storage/mmap_v1/dur_journalformat.h" #include "mongo/db/storage/mmap_v1/logfile.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/util/concurrency/mutex.h" namespace mongo { namespace dur { @@ -59,9 +64,6 @@ public: boost::filesystem::path getFilePathFor(int filenumber) const; - unsigned long long lastFlushTime() const { - return _lastFlushTime; - } void cleanup(bool log); // closes and removes journal files unsigned long long curFileId() const { @@ -81,14 +83,14 @@ private: /** check if time to rotate files. assure a file is open. * internally called with every commit */ - void _rotate(); + void _rotate(unsigned long long lsnOfCurrentJournalEntry); void _open(); void closeCurrentJournalFile(); void removeUnneededJournalFiles(); - unsigned long long _written; // bytes written so far to the current journal (log) file - unsigned _nextFileNumber; + unsigned long long _written = 0; // bytes written so far to the current journal (log) file + unsigned _nextFileNumber = 0; SimpleMutex _curLogFileMutex; @@ -105,13 +107,17 @@ private: std::list<JFile> _oldJournalFiles; // use _curLogFileMutex // lsn related - static void preFlush(); - static void postFlush(); - unsigned long long _preFlushTime; - // data < this time is fsynced in the datafiles (unless hard drive controller is caching) - unsigned long long _lastFlushTime; - bool _writeToLSNNeeded; - void updateLSNFile(); + friend void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber); + friend void notifyPreDataFileFlush(); + friend void notifyPostDataFileFlush(); + void updateLSNFile(unsigned long long lsnOfCurrentJournalEntry); + // data <= this time is in the shared view + AtomicUInt64 _lastSeqNumberWrittenToSharedView; + // data <= this time was in the shared view when the last flush to start started + AtomicUInt64 _preFlushTime; + // data <= this time is fsynced in the datafiles (unless hard drive controller is caching) + AtomicUInt64 _lastFlushTime; + AtomicWord<bool> _writeToLSNNeeded; }; } } diff --git a/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp b/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp index 232e31290e5..9a68deb3752 100644 --- a/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp @@ -175,7 +175,7 @@ static void _PREPLOGBUFFER(JSectHeader& h, AlignedBuilder& bb) { // Invalidate the total length, we will fill it in later. h.setSectionLen(0xffffffff); - h.seqNumber = getLastDataFileFlushTime(); + h.seqNumber = generateNextSeqNumber(); h.fileId = j.curFileId(); // Ops other than basic writes (DurOp's) go first diff --git a/src/mongo/db/storage/mmap_v1/dur_recover.cpp b/src/mongo/db/storage/mmap_v1/dur_recover.cpp index e3a686fc21e..c29036b26a3 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recover.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_recover.cpp @@ -253,7 +253,10 @@ static string fileName(const char* dbName, int fileNo) { RecoveryJob::RecoveryJob() - : _recovering(false), _lastDataSyncedFromLastRun(0), _lastSeqMentionedInConsoleLog(1) {} + : _recovering(false), + _lastDataSyncedFromLastRun(0), + _lastSeqSkipped(0), + _appliedAnySections(false) {} RecoveryJob::~RecoveryJob() { DESTRUCTOR_GUARD(if (!_mmfs.empty()) {} close();) @@ -382,27 +385,46 @@ void RecoveryJob::processSection(const JSectHeader* h, LockMongoFilesShared lkFiles; // for RecoveryJob::Last stdx::lock_guard<stdx::mutex> lk(_mx); - // Check the footer checksum before doing anything else. if (_recovering) { + // Check the footer checksum before doing anything else. verify(((const char*)h) + sizeof(JSectHeader) == p); if (!f->checkHash(h, len + sizeof(JSectHeader))) { log() << "journal section checksum doesn't match"; throw JournalSectionCorruptException(); } - } - if (_recovering && _lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) { - if (h->seqNumber != _lastSeqMentionedInConsoleLog) { - static int n; - if (++n < 10) { + static uint64_t numJournalSegmentsSkipped = 0; + static const uint64_t kMaxSkippedSectionsToLog = 10; + if (_lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) { + if (_appliedAnySections) { + severe() << "Journal section sequence number " << h->seqNumber + << " is lower than the threshold for applying (" + << h->seqNumber + ExtraKeepTimeMs + << ") but we have already applied some journal sections. This implies a " + << "corrupt journal file."; + fassertFailed(34369); + } + + if (++numJournalSegmentsSkipped < kMaxSkippedSectionsToLog) { log() << "recover skipping application of section seq:" << h->seqNumber << " < lsn:" << _lastDataSyncedFromLastRun << endl; - } else if (n == 10) { + } else if (numJournalSegmentsSkipped == kMaxSkippedSectionsToLog) { log() << "recover skipping application of section more..." << endl; } - _lastSeqMentionedInConsoleLog = h->seqNumber; + _lastSeqSkipped = h->seqNumber; + return; + } + + if (!_appliedAnySections) { + _appliedAnySections = true; + if (numJournalSegmentsSkipped >= kMaxSkippedSectionsToLog) { + // Log the last skipped section's sequence number if it hasn't been logged before. + log() << "recover final skipped journal section had sequence number " + << _lastSeqSkipped; + } + log() << "recover applying initial journal section with sequence number " + << h->seqNumber; } - return; } unique_ptr<JournalSectionIterator> i; @@ -551,6 +573,12 @@ void RecoveryJob::go(vector<boost::filesystem::path>& files) { } } + if (_lastSeqSkipped && !_appliedAnySections) { + log() << "recover journal replay completed without applying any sections. " + << "This can happen if there were no writes after the last fsync of the data files. " + << "Last skipped sections had sequence number " << _lastSeqSkipped; + } + close(); if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalScanOnly) { diff --git a/src/mongo/db/storage/mmap_v1/dur_recover.h b/src/mongo/db/storage/mmap_v1/dur_recover.h index e05e7926215..39925f9063f 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recover.h +++ b/src/mongo/db/storage/mmap_v1/dur_recover.h @@ -94,7 +94,8 @@ private: bool _recovering; unsigned long long _lastDataSyncedFromLastRun; - unsigned long long _lastSeqMentionedInConsoleLog; + unsigned long long _lastSeqSkipped; + bool _appliedAnySections; static RecoveryJob& _instance; diff --git a/src/mongo/db/storage/mmap_v1/mmap.cpp b/src/mongo/db/storage/mmap_v1/mmap.cpp index 57559d3038e..618e5777de6 100644 --- a/src/mongo/db/storage/mmap_v1/mmap.cpp +++ b/src/mongo/db/storage/mmap_v1/mmap.cpp @@ -166,19 +166,8 @@ void MongoFile::closeAllFiles(stringstream& message) { return total; } -void nullFunc() {} - -// callback notifications -void (*MongoFile::notifyPreFlush)() = nullFunc; -void (*MongoFile::notifyPostFlush)() = nullFunc; - /*static*/ int MongoFile::flushAll(bool sync) { - if (sync) - notifyPreFlush(); - int x = _flushAll(sync); - if (sync) - notifyPostFlush(); - return x; + return _flushAll(sync); } /*static*/ int MongoFile::_flushAll(bool sync) { diff --git a/src/mongo/db/storage/mmap_v1/mmap.h b/src/mongo/db/storage/mmap_v1/mmap.h index 6413dc26127..a1435ad95d8 100644 --- a/src/mongo/db/storage/mmap_v1/mmap.h +++ b/src/mongo/db/storage/mmap_v1/mmap.h @@ -129,10 +129,6 @@ public: */ static std::set<MongoFile*>& getAllFiles(); - // callbacks if you need them - static void (*notifyPreFlush)(); - static void (*notifyPostFlush)(); - static int flushAll(bool sync); // returns n flushed static long long totalMappedLength(); static void closeAllFiles(std::stringstream& message); |