summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-01-21 15:35:16 -0500
committerMathias Stearn <redbeard0531@gmail.com>2016-02-02 15:29:22 -0500
commitc259a7ef3886ae645a58a36a22a9e6e5e45875c3 (patch)
treebac0c8559130949a7117318bae177554387a2b97
parent0e2647c7ce1fde853575c912a6160770cdbb7b65 (diff)
downloadmongo-c259a7ef3886ae645a58a36a22a9e6e5e45875c3.tar.gz
SERVER-22261 Ensure LSNFile only contains sequence numbers flushed to data files
(cherry picked from commit 0386400f728588140ccd9c896f7b87370d9bc866)
-rw-r--r--jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js2
-rw-r--r--src/mongo/db/storage/mmap_v1/data_file_sync.cpp3
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_journal.cpp80
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_journal.h30
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp2
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_journalimpl.h32
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp2
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recover.cpp46
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recover.h3
-rw-r--r--src/mongo/util/mmap.cpp13
-rw-r--r--src/mongo/util/mmap.h4
11 files changed, 140 insertions, 77 deletions
diff --git a/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js b/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js
index a2e5265463c..d279505eb6b 100644
--- a/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js
+++ b/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js
@@ -36,7 +36,7 @@ if (0) {
// Set up replica set
var replTest = new ReplSetTest({ name: 'bgIndexNoRetry', nodes: 3,
- nodeOptions : {noIndexBuildRetry:""} });
+ nodeOptions : {noIndexBuildRetry:"", syncdelay:1} });
var nodenames = replTest.nodeList();
// We can't use an arbiter as the third node because the -auth test tries to log on there
diff --git a/src/mongo/db/storage/mmap_v1/data_file_sync.cpp b/src/mongo/db/storage/mmap_v1/data_file_sync.cpp
index 2c9830595fc..fa80c2942f7 100644
--- a/src/mongo/db/storage/mmap_v1/data_file_sync.cpp
+++ b/src/mongo/db/storage/mmap_v1/data_file_sync.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/instance.h"
+#include "mongo/db/storage/mmap_v1/dur_journal.h"
#include "mongo/db/storage/mmap_v1/mmap_v1_options.h"
#include "mongo/db/storage_options.h"
#include "mongo/util/exit.h"
@@ -80,7 +81,9 @@ void DataFileSync::run() {
Date_t start = jsTime();
StorageEngine* storageEngine = getGlobalEnvironment()->getGlobalStorageEngine();
+ dur::notifyPreDataFileFlush();
int numFiles = storageEngine->flushAllFiles(true);
+ dur::notifyPostDataFileFlush();
time_flushing = (int)(jsTime() - start);
_flushed(time_flushing);
diff --git a/src/mongo/db/storage/mmap_v1/dur_journal.cpp b/src/mongo/db/storage/mmap_v1/dur_journal.cpp
index 2888cbdec63..14581b15a01 100644
--- a/src/mongo/db/storage/mmap_v1/dur_journal.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_journal.cpp
@@ -197,9 +197,10 @@ Journal::Journal() : _curLogFileMutex("JournalLfMutex") {
_nextFileNumber = 0;
_curLogFile = 0;
_curFileId = 0;
- _preFlushTime = 0;
- _lastFlushTime = 0;
- _writeToLSNNeeded = false;
+ _lastSeqNumberWrittenToSharedView.store(0);
+ _preFlushTime.store(0);
+ _lastFlushTime.store(0);
+ _writeToLSNNeeded.store(false);
}
boost::filesystem::path Journal::getFilePathFor(int filenumber) const {
@@ -548,12 +549,9 @@ void Journal::_open() {
void Journal::init() {
verify(_curLogFile == 0);
- MongoFile::notifyPreFlush = preFlush;
- MongoFile::notifyPostFlush = postFlush;
}
void Journal::open() {
- verify(MongoFile::notifyPreFlush == preFlush);
SimpleMutex::scoped_lock lk(_curLogFileMutex);
_open();
}
@@ -610,18 +608,28 @@ unsigned long long journalReadLSN() {
return 0;
}
-unsigned long long getLastDataFileFlushTime() {
- return j.lastFlushTime();
-}
-
/** remember "last sequence number" to speed recoveries
concurrency: called by durThread only.
*/
-void Journal::updateLSNFile() {
- if (!_writeToLSNNeeded)
+void Journal::updateLSNFile(unsigned long long lsnOfCurrentJournalEntry) {
+ if (!_writeToLSNNeeded.load())
return;
- _writeToLSNNeeded = false;
+ _writeToLSNNeeded.store(false);
try {
+ // Don't read from _lastFlushTime again in this function since it may change.
+ const uint64_t copyOfLastFlushTime = _lastFlushTime.load();
+
+ // Only write an LSN that is older than the journal entry we are in the middle of writing.
+ // If this trips, it means that _lastFlushTime got ahead of what is actually in the data
+ // files because lsnOfCurrentJournalEntry includes data that hasn't yet been written to the
+ // data files.
+ if (copyOfLastFlushTime >= lsnOfCurrentJournalEntry) {
+ severe() << "Attempting to update LSNFile to " << copyOfLastFlushTime
+ << " which is not older than the current journal sequence number "
+ << lsnOfCurrentJournalEntry;
+ fassertFailed(34370);
+ }
+
// os can flush as it likes. if it flushes slowly, we will just do extra work on recovery.
// however, given we actually close the file, that seems unlikely.
File f;
@@ -631,9 +639,9 @@ void Journal::updateLSNFile() {
log() << "warning: open of lsn file failed" << endl;
return;
}
- LOG(1) << "lsn set " << _lastFlushTime << endl;
+ LOG(1) << "lsn set " << copyOfLastFlushTime << endl;
LSNFile lsnf;
- lsnf.set(_lastFlushTime);
+ lsnf.set(copyOfLastFlushTime);
f.write(0, (char*)&lsnf, sizeof(lsnf));
// do we want to fsync here? if we do it probably needs to be async so the durthread
// is not delayed.
@@ -643,13 +651,34 @@ void Journal::updateLSNFile() {
}
}
-void Journal::preFlush() {
- j._preFlushTime = Listener::getElapsedTimeMillis();
+namespace {
+SimpleMutex lastGeneratedSeqNumberMutex("lastGeneratedSeqNumberMutex");
+uint64_t lastGeneratedSeqNumber = 0;
+}
+
+uint64_t generateNextSeqNumber() {
+ const uint64_t now = Listener::getElapsedTimeMillis();
+ SimpleMutex::scoped_lock lock(lastGeneratedSeqNumberMutex);
+ if (now > lastGeneratedSeqNumber) {
+ lastGeneratedSeqNumber = now;
+ } else {
+ // Make sure we return unique monotonically increasing numbers.
+ lastGeneratedSeqNumber++;
+ }
+ return lastGeneratedSeqNumber;
}
-void Journal::postFlush() {
- j._lastFlushTime = j._preFlushTime;
- j._writeToLSNNeeded = true;
+void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber) {
+ j._lastSeqNumberWrittenToSharedView.store(seqNumber);
+}
+
+void notifyPreDataFileFlush() {
+ j._preFlushTime.store(j._lastSeqNumberWrittenToSharedView.load());
+}
+
+void notifyPostDataFileFlush() {
+ j._lastFlushTime.store(j._preFlushTime.load());
+ j._writeToLSNNeeded.store(true);
}
// call from within _curLogFileMutex
@@ -659,7 +688,7 @@ void Journal::closeCurrentJournalFile() {
JFile jf;
jf.filename = _curLogFile->_name;
- jf.lastEventTimeMs = Listener::getElapsedTimeMillis();
+ jf.lastEventTimeMs = generateNextSeqNumber();
_oldJournalFiles.push_back(jf);
delete _curLogFile; // close
@@ -678,7 +707,7 @@ void Journal::removeUnneededJournalFiles() {
// '_lastFlushTime' is the start time of the last successful flush of the data files to
// disk. We can't delete this journal file until the last successful flush time is at least
// 10 seconds after 'f.lastEventTimeMs'.
- if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime) {
+ if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime.load()) {
// eligible for deletion
boost::filesystem::path p(f.filename);
log() << "old journal file will be removed: " << f.filename << endl;
@@ -691,13 +720,12 @@ void Journal::removeUnneededJournalFiles() {
}
}
-void Journal::_rotate() {
+void Journal::_rotate(unsigned long long lsnOfCurrentJournalEntry) {
_curLogFileMutex.dassertLocked();
-
if (inShutdown() || !_curLogFile)
return;
- j.updateLSNFile();
+ j.updateLSNFile(lsnOfCurrentJournalEntry);
if (_curLogFile && _written < DataLimitPerJournalFile)
return;
@@ -785,7 +813,7 @@ void Journal::journal(const JSectHeader& h, const AlignedBuilder& uncompressed)
verify(w <= L);
stats.curr()->_journaledBytes += L;
_curLogFile->synchronousAppend((const void*)b.buf(), L);
- _rotate();
+ _rotate(h.seqNumber);
} catch (std::exception& e) {
log() << "error exception in dur::journal " << e.what() << endl;
throw;
diff --git a/src/mongo/db/storage/mmap_v1/dur_journal.h b/src/mongo/db/storage/mmap_v1/dur_journal.h
index 07def586090..68e3572f5a3 100644
--- a/src/mongo/db/storage/mmap_v1/dur_journal.h
+++ b/src/mongo/db/storage/mmap_v1/dur_journal.h
@@ -30,6 +30,8 @@
#pragma once
+#include <cstdint>
+
namespace mongo {
class AlignedBuilder;
@@ -51,12 +53,17 @@ void journalCleanup(bool log = false);
/** assure journal/ dir exists. throws */
void journalMakeDir();
-/** check if time to rotate files; assure a file is open.
- done separately from the journal() call as we can do this part
- outside of lock.
- only called by durThread.
+/**
+ * Generates the next sequence number for use in the journal, guaranteed to be greater than all
+ * prior sequence numbers.
+ */
+uint64_t generateNextSeqNumber();
+
+/**
+ * Informs the journaling system that all writes on or before the passed in sequence number have
+ * been written to the data files' shared mmap view.
*/
-void journalRotate();
+void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber);
/** flag that something has gone wrong during writing to the journal
(not for recovery mode)
@@ -66,8 +73,6 @@ void journalingFailure(const char* msg);
/** read lsn from disk from the last run before doing recovery */
unsigned long long journalReadLSN();
-unsigned long long getLastDataFileFlushTime();
-
/** never throws.
@param anyFiles by default we only look at j._* files. If anyFiles is true, return true
if there are any files in the journal directory. acquirePathLock() uses this to
@@ -85,5 +90,12 @@ void WRITETOJOURNAL(const JSectHeader& h, const AlignedBuilder& uncompressed);
const long long ExtraKeepTimeMs = 10000;
const unsigned JournalCommitIntervalDefault = 100;
-}
-}
+
+/**
+ * Call these before (pre) and after (post) the datafiles are flushed to disk by the DataFileSync
+ * thread. These should not be called for any other flushes.
+ */
+void notifyPreDataFileFlush();
+void notifyPostDataFileFlush();
+} // namespace dur
+} // namespace mongo
diff --git a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp
index 50fe9d28aa1..8b231555202 100644
--- a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp
@@ -65,6 +65,8 @@ void WRITETODATAFILES(const JSectHeader& h, const AlignedBuilder& uncompressed)
const long long m = t.micros();
stats.curr()->_writeToDataFilesMicros += m;
+ setLastSeqNumberWrittenToSharedView(h.seqNumber);
+
LOG(4) << "journal WRITETODATAFILES " << m / 1000.0 << "ms";
}
diff --git a/src/mongo/db/storage/mmap_v1/dur_journalimpl.h b/src/mongo/db/storage/mmap_v1/dur_journalimpl.h
index e51608b69e4..6a47753e977 100644
--- a/src/mongo/db/storage/mmap_v1/dur_journalimpl.h
+++ b/src/mongo/db/storage/mmap_v1/dur_journalimpl.h
@@ -30,7 +30,12 @@
#pragma once
+#include <boost/filesystem/path.hpp>
+
+#include "mongo/db/storage/mmap_v1/aligned_builder.h"
#include "mongo/db/storage/mmap_v1/dur_journalformat.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/logfile.h"
namespace mongo {
@@ -59,9 +64,6 @@ public:
boost::filesystem::path getFilePathFor(int filenumber) const;
- unsigned long long lastFlushTime() const {
- return _lastFlushTime;
- }
void cleanup(bool log); // closes and removes journal files
unsigned long long curFileId() const {
@@ -81,14 +83,14 @@ private:
/** check if time to rotate files. assure a file is open.
* internally called with every commit
*/
- void _rotate();
+ void _rotate(unsigned long long lsnOfCurrentJournalEntry);
void _open();
void closeCurrentJournalFile();
void removeUnneededJournalFiles();
- unsigned long long _written; // bytes written so far to the current journal (log) file
- unsigned _nextFileNumber;
+ unsigned long long _written = 0; // bytes written so far to the current journal (log) file
+ unsigned _nextFileNumber = 0;
SimpleMutex _curLogFileMutex;
@@ -105,13 +107,17 @@ private:
std::list<JFile> _oldJournalFiles; // use _curLogFileMutex
// lsn related
- static void preFlush();
- static void postFlush();
- unsigned long long _preFlushTime;
- // data < this time is fsynced in the datafiles (unless hard drive controller is caching)
- unsigned long long _lastFlushTime;
- bool _writeToLSNNeeded;
- void updateLSNFile();
+ friend void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber);
+ friend void notifyPreDataFileFlush();
+ friend void notifyPostDataFileFlush();
+ void updateLSNFile(unsigned long long lsnOfCurrentJournalEntry);
+ // data <= this time is in the shared view
+ AtomicUInt64 _lastSeqNumberWrittenToSharedView;
+ // data <= this time was in the shared view when the last flush to start started
+ AtomicUInt64 _preFlushTime;
+ // data <= this time is fsynced in the datafiles (unless hard drive controller is caching)
+ AtomicUInt64 _lastFlushTime;
+ AtomicWord<bool> _writeToLSNNeeded;
};
}
}
diff --git a/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp b/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp
index ea3b4e85148..d1ecdba5b99 100644
--- a/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp
@@ -175,7 +175,7 @@ static void _PREPLOGBUFFER(JSectHeader& h, AlignedBuilder& bb) {
// Invalidate the total length, we will fill it in later.
h.setSectionLen(0xffffffff);
- h.seqNumber = getLastDataFileFlushTime();
+ h.seqNumber = generateNextSeqNumber();
h.fileId = j.curFileId();
// Ops other than basic writes (DurOp's) go first
diff --git a/src/mongo/db/storage/mmap_v1/dur_recover.cpp b/src/mongo/db/storage/mmap_v1/dur_recover.cpp
index c37fbd23ef7..d0e461ed0d7 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recover.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_recover.cpp
@@ -255,7 +255,8 @@ RecoveryJob::RecoveryJob()
: _mx("recovery"),
_recovering(false),
_lastDataSyncedFromLastRun(0),
- _lastSeqMentionedInConsoleLog(1) {}
+ _lastSeqSkipped(0),
+ _appliedAnySections(false) {}
RecoveryJob::~RecoveryJob() {
DESTRUCTOR_GUARD(if (!_mmfs.empty()) {} close();)
@@ -384,27 +385,46 @@ void RecoveryJob::processSection(const JSectHeader* h,
LockMongoFilesShared lkFiles; // for RecoveryJob::Last
scoped_lock lk(_mx);
- // Check the footer checksum before doing anything else.
if (_recovering) {
+ // Check the footer checksum before doing anything else.
verify(((const char*)h) + sizeof(JSectHeader) == p);
if (!f->checkHash(h, len + sizeof(JSectHeader))) {
log() << "journal section checksum doesn't match";
throw JournalSectionCorruptException();
}
- }
- if (_recovering && _lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) {
- if (h->seqNumber != _lastSeqMentionedInConsoleLog) {
- static int n;
- if (++n < 10) {
+ static uint64_t numJournalSegmentsSkipped = 0;
+ static const uint64_t kMaxSkippedSectionsToLog = 10;
+ if (_lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) {
+ if (_appliedAnySections) {
+ severe() << "Journal section sequence number " << h->seqNumber
+ << " is lower than the threshold for applying ("
+ << h->seqNumber + ExtraKeepTimeMs
+ << ") but we have already applied some journal sections. This implies a "
+ << "corrupt journal file.";
+ fassertFailed(34369);
+ }
+
+ if (++numJournalSegmentsSkipped < kMaxSkippedSectionsToLog) {
log() << "recover skipping application of section seq:" << h->seqNumber
<< " < lsn:" << _lastDataSyncedFromLastRun << endl;
- } else if (n == 10) {
+ } else if (numJournalSegmentsSkipped == kMaxSkippedSectionsToLog) {
log() << "recover skipping application of section more..." << endl;
}
- _lastSeqMentionedInConsoleLog = h->seqNumber;
+ _lastSeqSkipped = h->seqNumber;
+ return;
+ }
+
+ if (!_appliedAnySections) {
+ _appliedAnySections = true;
+ if (numJournalSegmentsSkipped >= kMaxSkippedSectionsToLog) {
+ // Log the last skipped section's sequence number if it hasn't been logged before.
+ log() << "recover final skipped journal section had sequence number "
+ << _lastSeqSkipped;
+ }
+ log() << "recover applying initial journal section with sequence number "
+ << h->seqNumber;
}
- return;
}
auto_ptr<JournalSectionIterator> i;
@@ -553,6 +573,12 @@ void RecoveryJob::go(vector<boost::filesystem::path>& files) {
}
}
+ if (_lastSeqSkipped && !_appliedAnySections) {
+ log() << "recover journal replay completed without applying any sections. "
+ << "This can happen if there were no writes after the last fsync of the data files. "
+ << "Last skipped sections had sequence number " << _lastSeqSkipped;
+ }
+
close();
if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalScanOnly) {
diff --git a/src/mongo/db/storage/mmap_v1/dur_recover.h b/src/mongo/db/storage/mmap_v1/dur_recover.h
index a9f20f2e8cf..5c22249363d 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recover.h
+++ b/src/mongo/db/storage/mmap_v1/dur_recover.h
@@ -95,7 +95,8 @@ private:
bool _recovering;
unsigned long long _lastDataSyncedFromLastRun;
- unsigned long long _lastSeqMentionedInConsoleLog;
+ unsigned long long _lastSeqSkipped;
+ bool _appliedAnySections;
static RecoveryJob& _instance;
diff --git a/src/mongo/util/mmap.cpp b/src/mongo/util/mmap.cpp
index bfcd593650d..fe2fd672f24 100644
--- a/src/mongo/util/mmap.cpp
+++ b/src/mongo/util/mmap.cpp
@@ -166,19 +166,8 @@ void MongoFile::closeAllFiles(stringstream& message) {
return total;
}
-void nullFunc() {}
-
-// callback notifications
-void (*MongoFile::notifyPreFlush)() = nullFunc;
-void (*MongoFile::notifyPostFlush)() = nullFunc;
-
/*static*/ int MongoFile::flushAll(bool sync) {
- if (sync)
- notifyPreFlush();
- int x = _flushAll(sync);
- if (sync)
- notifyPostFlush();
- return x;
+ return _flushAll(sync);
}
/*static*/ int MongoFile::_flushAll(bool sync) {
diff --git a/src/mongo/util/mmap.h b/src/mongo/util/mmap.h
index f0ce0d29139..cfa2b809760 100644
--- a/src/mongo/util/mmap.h
+++ b/src/mongo/util/mmap.h
@@ -129,10 +129,6 @@ public:
*/
static std::set<MongoFile*>& getAllFiles();
- // callbacks if you need them
- static void (*notifyPreFlush)();
- static void (*notifyPostFlush)();
-
static int flushAll(bool sync); // returns n flushed
static long long totalMappedLength();
static void closeAllFiles(std::stringstream& message);