summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/db.cpp3
-rw-r--r--src/mongo/db/dur_journal.cpp92
-rw-r--r--src/mongo/db/dur_journal.h25
-rw-r--r--src/mongo/db/dur_journalimpl.h25
-rw-r--r--src/mongo/db/dur_preplogbuffer.cpp2
-rw-r--r--src/mongo/db/dur_recover.cpp45
-rw-r--r--src/mongo/db/dur_recover.h9
-rw-r--r--src/mongo/db/dur_writetodatafiles.cpp2
-rw-r--r--src/mongo/util/mmap.cpp11
-rw-r--r--src/mongo/util/mmap.h4
10 files changed, 144 insertions, 74 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 90a8013dba2..05347255f52 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/dbwebserver.h"
#include "mongo/db/dur.h"
+#include "mongo/db/dur_journal.h"
#include "mongo/db/index_names.h"
#include "mongo/db/index_rebuilder.h"
#include "mongo/db/initialize_server_global_state.h"
@@ -549,7 +550,9 @@ namespace mongo {
}
Date_t start = jsTime();
+ dur::notifyPreDataFileFlush();
int numFiles = MemoryMappedFile::flushAll( true );
+ dur::notifyPostDataFileFlush();
time_flushing = (int) (jsTime() - start);
_flushed(time_flushing);
diff --git a/src/mongo/db/dur_journal.cpp b/src/mongo/db/dur_journal.cpp
index a19032bc4c0..0f72a3901c1 100644
--- a/src/mongo/db/dur_journal.cpp
+++ b/src/mongo/db/dur_journal.cpp
@@ -187,9 +187,10 @@ namespace mongo {
_nextFileNumber = 0;
_curLogFile = 0;
_curFileId = 0;
- _preFlushTime = 0;
- _lastFlushTime = 0;
- _writeToLSNNeeded = false;
+ _lastSeqNumberWrittenToSharedView.store(0);
+ _preFlushTime.store(0);
+ _lastFlushTime.store(0);
+ _writeToLSNNeeded.store(false);
}
boost::filesystem::path Journal::getFilePathFor(int filenumber) const {
@@ -545,12 +546,9 @@ namespace mongo {
void Journal::init() {
verify( _curLogFile == 0 );
- MongoFile::notifyPreFlush = preFlush;
- MongoFile::notifyPostFlush = postFlush;
}
void Journal::open() {
- verify( MongoFile::notifyPreFlush == preFlush );
SimpleMutex::scoped_lock lk(_curLogFileMutex);
_open();
}
@@ -603,48 +601,78 @@ namespace mongo {
return 0;
}
- unsigned long long getLastDataFileFlushTime() {
- return j.lastFlushTime();
- }
-
/** remember "last sequence number" to speed recoveries
concurrency: called by durThread only.
*/
- void Journal::updateLSNFile() {
+ void Journal::updateLSNFile(unsigned long long lsnOfCurrentJournalEntry) {
RACECHECK
- if( !_writeToLSNNeeded )
+ if (!_writeToLSNNeeded.load())
return;
- _writeToLSNNeeded = false;
+ _writeToLSNNeeded.store(false);
try {
+ // Don't read from _lastFlushTime again in this function since it may change.
+ const uint64_t copyOfLastFlushTime = _lastFlushTime.load();
+
+ // Only write an LSN that is older than the journal entry we are in the middle of writing.
+ // If this trips, it means that _lastFlushTime got ahead of what is actually in the data
+ // files because lsnOfCurrentJournalEntry includes data that hasn't yet been written to the
+ // data files.
+ if (copyOfLastFlushTime >= lsnOfCurrentJournalEntry) {
+ severe() << "Attempting to update LSNFile to " << copyOfLastFlushTime
+ << " which is not older than the current journal sequence number "
+ << lsnOfCurrentJournalEntry;
+ fassertFailed(34370);
+ }
+
// os can flush as it likes. if it flushes slowly, we will just do extra work on recovery.
// however, given we actually close the file, that seems unlikely.
File f;
f.open(lsnPath().string().c_str());
- if( !f.is_open() ) {
+ if (!f.is_open()) {
// can get 0 if an i/o error
log() << "warning: open of lsn file failed" << endl;
return;
}
- LOG(1) << "lsn set " << _lastFlushTime << endl;
+ LOG(1) << "lsn set " << copyOfLastFlushTime << endl;
LSNFile lsnf;
- lsnf.set(_lastFlushTime);
+ lsnf.set(copyOfLastFlushTime);
f.write(0, (char*)&lsnf, sizeof(lsnf));
// do we want to fsync here? if we do it probably needs to be async so the durthread
// is not delayed.
- }
- catch(std::exception& e) {
+ } catch (std::exception& e) {
log() << "warning: write to lsn file failed " << e.what() << endl;
// keep running (ignore the error). recovery will be slow.
}
}
- void Journal::preFlush() {
- j._preFlushTime = Listener::getElapsedTimeMillis();
+ namespace {
+ SimpleMutex lastGeneratedSeqNumberMutex("lastGeneratedSeqNumberMutex");
+ uint64_t lastGeneratedSeqNumber = 0;
+ }
+
+ uint64_t generateNextSeqNumber() {
+ const uint64_t now = Listener::getElapsedTimeMillis();
+ SimpleMutex::scoped_lock lock(lastGeneratedSeqNumberMutex);
+ if (now > lastGeneratedSeqNumber) {
+ lastGeneratedSeqNumber = now;
+ } else {
+ // Make sure we return unique monotonically increasing numbers.
+ lastGeneratedSeqNumber++;
+ }
+ return lastGeneratedSeqNumber;
+ }
+
+ void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber) {
+ j._lastSeqNumberWrittenToSharedView.store(seqNumber);
+ }
+
+ void notifyPreDataFileFlush() {
+ j._preFlushTime.store(j._lastSeqNumberWrittenToSharedView.load());
}
- void Journal::postFlush() {
- j._lastFlushTime = j._preFlushTime;
- j._writeToLSNNeeded = true;
+ void notifyPostDataFileFlush() {
+ j._lastFlushTime.store(j._preFlushTime.load());
+ j._writeToLSNNeeded.store(true);
}
// call from within _curLogFileMutex
@@ -654,7 +682,7 @@ namespace mongo {
JFile jf;
jf.filename = _curLogFile->_name;
- jf.lastEventTimeMs = Listener::getElapsedTimeMillis();
+ jf.lastEventTimeMs = generateNextSeqNumber();
_oldJournalFiles.push_back(jf);
delete _curLogFile; // close
@@ -669,11 +697,11 @@ namespace mongo {
while( !_oldJournalFiles.empty() ) {
JFile f = _oldJournalFiles.front();
- // 'f.lastEventTimeMs' is the timestamp of the last thing in the journal file.
- // '_lastFlushTime' is the start time of the last successful flush of the data
- // files to disk. We can't delete this journal file until the last successful
- // flush time is at least 10 seconds after 'f.lastEventTimeMs'.
- if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime) {
+ // 'f.lastEventTimeMs' is the timestamp of the last thing in the journal file.
+ // '_lastFlushTime' is the start time of the last successful flush of the data
+ // files to disk. We can't delete this journal file until the last successful
+ // flush time is at least 10 seconds after 'f.lastEventTimeMs'.
+ if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime.load()) {
// eligible for deletion
boost::filesystem::path p( f.filename );
log() << "old journal file will be removed: " << f.filename << endl;
@@ -687,7 +715,7 @@ namespace mongo {
}
}
- void Journal::_rotate() {
+ void Journal::_rotate(unsigned long long lsnOfCurrentJournalEntry) {
RACECHECK;
@@ -696,7 +724,7 @@ namespace mongo {
if ( inShutdown() || !_curLogFile )
return;
- j.updateLSNFile();
+ j.updateLSNFile(lsnOfCurrentJournalEntry);
if( _curLogFile && _written < DataLimitPerJournalFile )
return;
@@ -785,7 +813,7 @@ namespace mongo {
verify( w <= L );
stats.curr->_journaledBytes += L;
_curLogFile->synchronousAppend((const void *) b.buf(), L);
- _rotate();
+ _rotate(h.seqNumber);
}
catch(std::exception& e) {
log() << "error exception in dur::journal " << e.what() << endl;
diff --git a/src/mongo/db/dur_journal.h b/src/mongo/db/dur_journal.h
index 9e7fb5bf4b6..f595f05c3ee 100644
--- a/src/mongo/db/dur_journal.h
+++ b/src/mongo/db/dur_journal.h
@@ -30,6 +30,8 @@
#pragma once
+#include <mongo/platform/cstdint.h>
+
namespace mongo {
class AlignedBuilder;
@@ -49,12 +51,17 @@ namespace mongo {
/** assure journal/ dir exists. throws */
void journalMakeDir();
- /** check if time to rotate files; assure a file is open.
- done separately from the journal() call as we can do this part
- outside of lock.
- only called by durThread.
+ /**
+ * Generates the next sequence number for use in the journal, guaranteed to be greater than all
+ * prior sequence numbers.
+ */
+ uint64_t generateNextSeqNumber();
+
+ /**
+ * Informs the journaling system that all writes on or before the passed in sequence number have
+ * been written to the data files' shared mmap view.
*/
- void journalRotate();
+ void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber);
/** flag that something has gone wrong during writing to the journal
(not for recovery mode)
@@ -64,8 +71,6 @@ namespace mongo {
/** read lsn from disk from the last run before doing recovery */
unsigned long long journalReadLSN();
- unsigned long long getLastDataFileFlushTime();
-
/** never throws.
@param anyFiles by default we only look at j._* files. If anyFiles is true, return true
if there are any files in the journal directory. acquirePathLock() uses this to
@@ -79,5 +84,11 @@ namespace mongo {
const unsigned JournalCommitIntervalDefault = 100;
+ /**
+ * Call these before (pre) and after (post) the datafiles are flushed to disk by the DataFileSync
+ * thread. These should not be called for any other flushes.
+ */
+ void notifyPreDataFileFlush();
+ void notifyPostDataFileFlush();
}
}
diff --git a/src/mongo/db/dur_journalimpl.h b/src/mongo/db/dur_journalimpl.h
index bafc0570f19..0c12a6677d5 100644
--- a/src/mongo/db/dur_journalimpl.h
+++ b/src/mongo/db/dur_journalimpl.h
@@ -30,7 +30,12 @@
#pragma once
+#include <boost/filesystem/path.hpp>
+
#include "mongo/db/dur_journalformat.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/util/alignedbuilder.h"
+#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/logfile.h"
namespace mongo {
@@ -59,7 +64,6 @@ namespace mongo {
boost::filesystem::path getFilePathFor(int filenumber) const;
- unsigned long long lastFlushTime() const { return _lastFlushTime; }
void cleanup(bool log); // closes and removes journal files
unsigned long long curFileId() const { return _curFileId; }
@@ -77,7 +81,7 @@ namespace mongo {
/** check if time to rotate files. assure a file is open.
* internally called with every commit
*/
- void _rotate();
+ void _rotate(unsigned long long lsnOfCurrentJournalEntry);
void _open();
void closeCurrentJournalFile();
@@ -101,12 +105,17 @@ namespace mongo {
list<JFile> _oldJournalFiles; // use _curLogFileMutex
// lsn related
- static void preFlush();
- static void postFlush();
- unsigned long long _preFlushTime;
- unsigned long long _lastFlushTime; // data < this time is fsynced in the datafiles (unless hard drive controller is caching)
- bool _writeToLSNNeeded;
- void updateLSNFile();
+ friend void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber);
+ friend void notifyPreDataFileFlush();
+ friend void notifyPostDataFileFlush();
+ void updateLSNFile(unsigned long long lsnOfCurrentJournalEntry);
+ // data <= this time is in the shared view
+ AtomicUInt64 _lastSeqNumberWrittenToSharedView;
+ // data <= this time was in the shared view when the last flush to start started
+ AtomicUInt64 _preFlushTime;
+ // data <= this time is fsynced in the datafiles (unless hard drive controller is caching)
+ AtomicUInt64 _lastFlushTime;
+ AtomicInt32 _writeToLSNNeeded;
};
}
diff --git a/src/mongo/db/dur_preplogbuffer.cpp b/src/mongo/db/dur_preplogbuffer.cpp
index 9686c21a526..c2f3b09adfa 100644
--- a/src/mongo/db/dur_preplogbuffer.cpp
+++ b/src/mongo/db/dur_preplogbuffer.cpp
@@ -168,7 +168,7 @@ namespace mongo {
bb.reset();
h.setSectionLen(0xffffffff); // total length, will fill in later
- h.seqNumber = getLastDataFileFlushTime();
+ h.seqNumber = generateNextSeqNumber();
h.fileId = j.curFileId();
}
diff --git a/src/mongo/db/dur_recover.cpp b/src/mongo/db/dur_recover.cpp
index 2e972b45d86..cd177222760 100644
--- a/src/mongo/db/dur_recover.cpp
+++ b/src/mongo/db/dur_recover.cpp
@@ -385,27 +385,46 @@ namespace mongo {
scoped_lock lk(_mx);
RACECHECK
- // Check the footer checksum before doing anything else.
if (_recovering) {
+ // Check the footer checksum before doing anything else.
verify( ((const char *)h) + sizeof(JSectHeader) == p );
if (!f->checkHash(h, len + sizeof(JSectHeader))) {
log() << "journal section checksum doesn't match";
throw JournalSectionCorruptException();
}
- }
- if( _recovering && _lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs ) {
- if( h->seqNumber != _lastSeqMentionedInConsoleLog ) {
- static int n;
- if( ++n < 10 ) {
- log() << "recover skipping application of section seq:" << h->seqNumber << " < lsn:" << _lastDataSyncedFromLastRun << endl;
+ static uint64_t numJournalSegmentsSkipped = 0;
+ static const uint64_t kMaxSkippedSectionsToLog = 10;
+ if (_lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) {
+ if (_appliedAnySections) {
+ severe() << "Journal section sequence number " << h->seqNumber
+ << " is lower than the threshold for applying ("
+ << h->seqNumber + ExtraKeepTimeMs
+ << ") but we have already applied some journal sections. "
+ << "This implies a corrupt journal file.";
+ fassertFailed(34369);
}
- else if( n == 10 ) {
+
+ if (++numJournalSegmentsSkipped < kMaxSkippedSectionsToLog) {
+ log() << "recover skipping application of section seq:" << h->seqNumber
+ << " < lsn:" << _lastDataSyncedFromLastRun << endl;
+ } else if (numJournalSegmentsSkipped == kMaxSkippedSectionsToLog) {
log() << "recover skipping application of section more..." << endl;
}
- _lastSeqMentionedInConsoleLog = h->seqNumber;
+ _lastSeqSkipped = h->seqNumber;
+ return;
+ }
+
+ if (!_appliedAnySections) {
+ _appliedAnySections = true;
+ if (numJournalSegmentsSkipped >= kMaxSkippedSectionsToLog) {
+ // Log the last skipped section's sequence number if it hasn't been logged before.
+ log() << "recover final skipped journal section had sequence number "
+ << _lastSeqSkipped;
+ }
+ log() << "recover applying initial journal section with sequence number "
+ << h->seqNumber;
}
- return;
}
auto_ptr<JournalSectionIterator> i;
@@ -553,6 +572,12 @@ namespace mongo {
}
}
+ if (_lastSeqSkipped && !_appliedAnySections) {
+ log() << "recover journal replay completed without applying any sections. "
+ << "This can happen if there were no writes after the last fsync of the data "
+ << "files. Last skipped sections had sequence number " << _lastSeqSkipped;
+ }
+
close();
if (storageGlobalParams.durOptions & StorageGlobalParams::DurScanOnly) {
diff --git a/src/mongo/db/dur_recover.h b/src/mongo/db/dur_recover.h
index b36c7ea6562..9ddb2550a5a 100644
--- a/src/mongo/db/dur_recover.h
+++ b/src/mongo/db/dur_recover.h
@@ -57,7 +57,11 @@ namespace mongo {
} last;
public:
RecoveryJob() : _lastDataSyncedFromLastRun(0),
- _mx("recovery"), _recovering(false) { _lastSeqMentionedInConsoleLog = 1; }
+ _lastSeqSkipped(0),
+ _appliedAnySections(false),
+ _mx("recovery"),
+ _recovering(false) {}
+
void go(vector<boost::filesystem::path>& files);
~RecoveryJob();
@@ -79,7 +83,8 @@ namespace mongo {
list<boost::shared_ptr<DurableMappedFile> > _mmfs;
unsigned long long _lastDataSyncedFromLastRun;
- unsigned long long _lastSeqMentionedInConsoleLog;
+ unsigned long long _lastSeqSkipped;
+ bool _appliedAnySections;
public:
mongo::mutex _mx; // protects _mmfs
private:
diff --git a/src/mongo/db/dur_writetodatafiles.cpp b/src/mongo/db/dur_writetodatafiles.cpp
index d9a2e345115..6d823c7aa07 100644
--- a/src/mongo/db/dur_writetodatafiles.cpp
+++ b/src/mongo/db/dur_writetodatafiles.cpp
@@ -31,6 +31,7 @@
#include "mongo/pch.h"
#include "mongo/db/dur_commitjob.h"
+#include "mongo/db/dur_journal.h"
#include "mongo/db/dur_recover.h"
#include "mongo/db/dur_stats.h"
#include "mongo/util/concurrency/mutex.h"
@@ -100,6 +101,7 @@ namespace mongo {
WRITETODATAFILES_Impl1(h, uncompressed);
unsigned long long m = t.micros();
stats.curr->_writeToDataFilesMicros += m;
+ setLastSeqNumberWrittenToSharedView(h.seqNumber);
LOG(2) << "journal WRITETODATAFILES " << m / 1000.0 << "ms" << endl;
}
diff --git a/src/mongo/util/mmap.cpp b/src/mongo/util/mmap.cpp
index b4b9435a7f5..f021cd55022 100644
--- a/src/mongo/util/mmap.cpp
+++ b/src/mongo/util/mmap.cpp
@@ -138,17 +138,8 @@ namespace {
return total;
}
- void nullFunc() { }
-
- // callback notifications
- void (*MongoFile::notifyPreFlush)() = nullFunc;
- void (*MongoFile::notifyPostFlush)() = nullFunc;
-
/*static*/ int MongoFile::flushAll( bool sync ) {
- if ( sync ) notifyPreFlush();
- int x = _flushAll(sync);
- if ( sync ) notifyPostFlush();
- return x;
+ return _flushAll(sync);
}
/*static*/ int MongoFile::_flushAll( bool sync ) {
diff --git a/src/mongo/util/mmap.h b/src/mongo/util/mmap.h
index 9d24da27c9a..33df258ad28 100644
--- a/src/mongo/util/mmap.h
+++ b/src/mongo/util/mmap.h
@@ -102,10 +102,6 @@ namespace mongo {
*/
static std::set<MongoFile*>& getAllFiles();
- // callbacks if you need them
- static void (*notifyPreFlush)();
- static void (*notifyPostFlush)();
-
static int flushAll( bool sync ); // returns n flushed
static long long totalMappedLength();
static void closeAllFiles( std::stringstream &message );