summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js2
-rw-r--r--src/mongo/db/storage/mmap_v1/data_file_sync.cpp5
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_journal.cpp79
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_journal.h26
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp2
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_journalimpl.h32
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp2
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recover.cpp48
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recover.h3
-rw-r--r--src/mongo/db/storage/mmap_v1/mmap.cpp13
-rw-r--r--src/mongo/db/storage/mmap_v1/mmap.h4
11 files changed, 142 insertions, 74 deletions
diff --git a/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js b/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js
index ee25b5874b5..c97d8320422 100644
--- a/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js
+++ b/jstests/noPassthroughWithMongod/indexbg_restart_sigkill_secondary_noretry.js
@@ -33,7 +33,7 @@
// Set up replica set
var replTest = new ReplSetTest({ name: 'bgIndexNoRetry', nodes: 3,
- nodeOptions : {noIndexBuildRetry:""} });
+ nodeOptions : {noIndexBuildRetry:"", syncdelay:1} });
var nodenames = replTest.nodeList();
// We can't use an arbiter as the third node because the -auth test tries to log on there
diff --git a/src/mongo/db/storage/mmap_v1/data_file_sync.cpp b/src/mongo/db/storage/mmap_v1/data_file_sync.cpp
index cf33e24d008..4fafae825ea 100644
--- a/src/mongo/db/storage/mmap_v1/data_file_sync.cpp
+++ b/src/mongo/db/storage/mmap_v1/data_file_sync.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/service_context.h"
#include "mongo/db/instance.h"
+#include "mongo/db/storage/mmap_v1/dur_journal.h"
#include "mongo/db/storage/mmap_v1/mmap.h"
#include "mongo/db/storage/mmap_v1/mmap_v1_options.h"
#include "mongo/db/storage/storage_options.h"
@@ -81,7 +82,11 @@ void DataFileSync::run() {
Date_t start = jsTime();
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
+
+ dur::notifyPreDataFileFlush();
int numFiles = storageEngine->flushAllFiles(true);
+ dur::notifyPostDataFileFlush();
+
time_flushing = durationCount<Milliseconds>(jsTime() - start);
_flushed(time_flushing);
diff --git a/src/mongo/db/storage/mmap_v1/dur_journal.cpp b/src/mongo/db/storage/mmap_v1/dur_journal.cpp
index 04b5182544f..14f329e0931 100644
--- a/src/mongo/db/storage/mmap_v1/dur_journal.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_journal.cpp
@@ -198,9 +198,10 @@ Journal::Journal() {
_nextFileNumber = 0;
_curLogFile = 0;
_curFileId = 0;
- _preFlushTime = 0;
- _lastFlushTime = 0;
- _writeToLSNNeeded = false;
+ _lastSeqNumberWrittenToSharedView.store(0);
+ _preFlushTime.store(0);
+ _lastFlushTime.store(0);
+ _writeToLSNNeeded.store(false);
}
boost::filesystem::path Journal::getFilePathFor(int filenumber) const {
@@ -550,12 +551,9 @@ void Journal::_open() {
void Journal::init() {
verify(_curLogFile == 0);
- MongoFile::notifyPreFlush = preFlush;
- MongoFile::notifyPostFlush = postFlush;
}
void Journal::open() {
- verify(MongoFile::notifyPreFlush == preFlush);
stdx::lock_guard<SimpleMutex> lk(_curLogFileMutex);
_open();
}
@@ -612,18 +610,28 @@ unsigned long long journalReadLSN() {
return 0;
}
-unsigned long long getLastDataFileFlushTime() {
- return j.lastFlushTime();
-}
-
/** remember "last sequence number" to speed recoveries
concurrency: called by durThread only.
*/
-void Journal::updateLSNFile() {
- if (!_writeToLSNNeeded)
+void Journal::updateLSNFile(unsigned long long lsnOfCurrentJournalEntry) {
+ if (!_writeToLSNNeeded.load())
return;
- _writeToLSNNeeded = false;
+ _writeToLSNNeeded.store(false);
try {
+ // Don't read from _lastFlushTime again in this function since it may change.
+ const uint64_t copyOfLastFlushTime = _lastFlushTime.load();
+
+ // Only write an LSN that is older than the journal entry we are in the middle of writing.
+ // If this trips, it means that _lastFlushTime got ahead of what is actually in the data
+ // files because lsnOfCurrentJournalEntry includes data that hasn't yet been written to the
+ // data files.
+ if (copyOfLastFlushTime >= lsnOfCurrentJournalEntry) {
+ severe() << "Attempting to update LSNFile to " << copyOfLastFlushTime
+ << " which is not older than the current journal sequence number "
+ << lsnOfCurrentJournalEntry;
+ fassertFailed(34370);
+ }
+
// os can flush as it likes. if it flushes slowly, we will just do extra work on recovery.
// however, given we actually close the file, that seems unlikely.
File f;
@@ -633,9 +641,9 @@ void Journal::updateLSNFile() {
log() << "warning: open of lsn file failed" << endl;
return;
}
- LOG(1) << "lsn set " << _lastFlushTime << endl;
+ LOG(1) << "lsn set " << copyOfLastFlushTime << endl;
LSNFile lsnf;
- lsnf.set(_lastFlushTime);
+ lsnf.set(copyOfLastFlushTime);
f.write(0, (char*)&lsnf, sizeof(lsnf));
// do we want to fsync here? if we do it probably needs to be async so the durthread
// is not delayed.
@@ -645,13 +653,34 @@ void Journal::updateLSNFile() {
}
}
-void Journal::preFlush() {
- j._preFlushTime = Listener::getElapsedTimeMillis();
+namespace {
+stdx::mutex lastGeneratedSeqNumberMutex;
+uint64_t lastGeneratedSeqNumber = 0;
+}
+
+uint64_t generateNextSeqNumber() {
+ const uint64_t now = Listener::getElapsedTimeMillis();
+ stdx::lock_guard<stdx::mutex> lock(lastGeneratedSeqNumberMutex);
+ if (now > lastGeneratedSeqNumber) {
+ lastGeneratedSeqNumber = now;
+ } else {
+ // Make sure we return unique monotonically increasing numbers.
+ lastGeneratedSeqNumber++;
+ }
+ return lastGeneratedSeqNumber;
+}
+
+void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber) {
+ j._lastSeqNumberWrittenToSharedView.store(seqNumber);
+}
+
+void notifyPreDataFileFlush() {
+ j._preFlushTime.store(j._lastSeqNumberWrittenToSharedView.load());
}
-void Journal::postFlush() {
- j._lastFlushTime = j._preFlushTime;
- j._writeToLSNNeeded = true;
+void notifyPostDataFileFlush() {
+ j._lastFlushTime.store(j._preFlushTime.load());
+ j._writeToLSNNeeded.store(true);
}
// call from within _curLogFileMutex
@@ -661,7 +690,7 @@ void Journal::closeCurrentJournalFile() {
JFile jf;
jf.filename = _curLogFile->_name;
- jf.lastEventTimeMs = Listener::getElapsedTimeMillis();
+ jf.lastEventTimeMs = generateNextSeqNumber();
_oldJournalFiles.push_back(jf);
delete _curLogFile; // close
@@ -680,7 +709,7 @@ void Journal::removeUnneededJournalFiles() {
// '_lastFlushTime' is the start time of the last successful flush of the data files to
// disk. We can't delete this journal file until the last successful flush time is at least
// 10 seconds after 'f.lastEventTimeMs'.
- if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime) {
+ if (f.lastEventTimeMs + ExtraKeepTimeMs < _lastFlushTime.load()) {
// eligible for deletion
boost::filesystem::path p(f.filename);
log() << "old journal file will be removed: " << f.filename << endl;
@@ -693,11 +722,11 @@ void Journal::removeUnneededJournalFiles() {
}
}
-void Journal::_rotate() {
+void Journal::_rotate(unsigned long long lsnOfCurrentJournalEntry) {
if (inShutdown() || !_curLogFile)
return;
- j.updateLSNFile();
+ j.updateLSNFile(lsnOfCurrentJournalEntry);
if (_curLogFile && _written < DataLimitPerJournalFile)
return;
@@ -785,7 +814,7 @@ void Journal::journal(const JSectHeader& h, const AlignedBuilder& uncompressed)
verify(w <= L);
stats.curr()->_journaledBytes += L;
_curLogFile->synchronousAppend((const void*)b.buf(), L);
- _rotate();
+ _rotate(h.seqNumber);
} catch (std::exception& e) {
log() << "error exception in dur::journal " << e.what() << endl;
throw;
diff --git a/src/mongo/db/storage/mmap_v1/dur_journal.h b/src/mongo/db/storage/mmap_v1/dur_journal.h
index c9911b99507..e88ea7dabb5 100644
--- a/src/mongo/db/storage/mmap_v1/dur_journal.h
+++ b/src/mongo/db/storage/mmap_v1/dur_journal.h
@@ -30,6 +30,8 @@
#pragma once
+#include <cstdint>
+
namespace mongo {
class AlignedBuilder;
@@ -51,12 +53,17 @@ void journalCleanup(bool log = false);
/** assure journal/ dir exists. throws */
void journalMakeDir();
-/** check if time to rotate files; assure a file is open.
- done separately from the journal() call as we can do this part
- outside of lock.
- only called by durThread.
+/**
+ * Generates the next sequence number for use in the journal, guaranteed to be greater than all
+ * prior sequence numbers.
+ */
+uint64_t generateNextSeqNumber();
+
+/**
+ * Informs the journaling system that all writes on or before the passed in sequence number have
+ * been written to the data files' shared mmap view.
*/
-void journalRotate();
+void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber);
/** flag that something has gone wrong during writing to the journal
(not for recovery mode)
@@ -66,8 +73,6 @@ void journalingFailure(const char* msg);
/** read lsn from disk from the last run before doing recovery */
unsigned long long journalReadLSN();
-unsigned long long getLastDataFileFlushTime();
-
/** never throws.
@param anyFiles by default we only look at j._* files. If anyFiles is true, return true
if there are any files in the journal directory. acquirePathLock() uses this to
@@ -83,5 +88,12 @@ void WRITETOJOURNAL(const JSectHeader& h, const AlignedBuilder& uncompressed);
// in case disk controller buffers writes
const long long ExtraKeepTimeMs = 10000;
+
+/**
+ * Call these before (pre) and after (post) the datafiles are flushed to disk by the DataFileSync
+ * thread. These should not be called for any other flushes.
+ */
+void notifyPreDataFileFlush();
+void notifyPostDataFileFlush();
} // namespace dur
} // namespace mongo
diff --git a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp
index 78b40e25313..9670aa80300 100644
--- a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp
@@ -64,6 +64,8 @@ void WRITETODATAFILES(const JSectHeader& h, const AlignedBuilder& uncompressed)
const long long m = t.micros();
stats.curr()->_writeToDataFilesMicros += m;
+ setLastSeqNumberWrittenToSharedView(h.seqNumber);
+
LOG(4) << "journal WRITETODATAFILES " << m / 1000.0 << "ms";
}
diff --git a/src/mongo/db/storage/mmap_v1/dur_journalimpl.h b/src/mongo/db/storage/mmap_v1/dur_journalimpl.h
index 77e79ccb8d1..62d28db7036 100644
--- a/src/mongo/db/storage/mmap_v1/dur_journalimpl.h
+++ b/src/mongo/db/storage/mmap_v1/dur_journalimpl.h
@@ -30,8 +30,13 @@
#pragma once
+#include <boost/filesystem/path.hpp>
+
+#include "mongo/db/storage/mmap_v1/aligned_builder.h"
#include "mongo/db/storage/mmap_v1/dur_journalformat.h"
#include "mongo/db/storage/mmap_v1/logfile.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/util/concurrency/mutex.h"
namespace mongo {
namespace dur {
@@ -59,9 +64,6 @@ public:
boost::filesystem::path getFilePathFor(int filenumber) const;
- unsigned long long lastFlushTime() const {
- return _lastFlushTime;
- }
void cleanup(bool log); // closes and removes journal files
unsigned long long curFileId() const {
@@ -81,14 +83,14 @@ private:
/** check if time to rotate files. assure a file is open.
* internally called with every commit
*/
- void _rotate();
+ void _rotate(unsigned long long lsnOfCurrentJournalEntry);
void _open();
void closeCurrentJournalFile();
void removeUnneededJournalFiles();
- unsigned long long _written; // bytes written so far to the current journal (log) file
- unsigned _nextFileNumber;
+ unsigned long long _written = 0; // bytes written so far to the current journal (log) file
+ unsigned _nextFileNumber = 0;
SimpleMutex _curLogFileMutex;
@@ -105,13 +107,17 @@ private:
std::list<JFile> _oldJournalFiles; // use _curLogFileMutex
// lsn related
- static void preFlush();
- static void postFlush();
- unsigned long long _preFlushTime;
- // data < this time is fsynced in the datafiles (unless hard drive controller is caching)
- unsigned long long _lastFlushTime;
- bool _writeToLSNNeeded;
- void updateLSNFile();
+ friend void setLastSeqNumberWrittenToSharedView(uint64_t seqNumber);
+ friend void notifyPreDataFileFlush();
+ friend void notifyPostDataFileFlush();
+ void updateLSNFile(unsigned long long lsnOfCurrentJournalEntry);
+ // data <= this time is in the shared view
+ AtomicUInt64 _lastSeqNumberWrittenToSharedView;
+ // data <= this time was in the shared view when the last flush to start started
+ AtomicUInt64 _preFlushTime;
+ // data <= this time is fsynced in the datafiles (unless hard drive controller is caching)
+ AtomicUInt64 _lastFlushTime;
+ AtomicWord<bool> _writeToLSNNeeded;
};
}
}
diff --git a/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp b/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp
index 232e31290e5..9a68deb3752 100644
--- a/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_preplogbuffer.cpp
@@ -175,7 +175,7 @@ static void _PREPLOGBUFFER(JSectHeader& h, AlignedBuilder& bb) {
// Invalidate the total length, we will fill it in later.
h.setSectionLen(0xffffffff);
- h.seqNumber = getLastDataFileFlushTime();
+ h.seqNumber = generateNextSeqNumber();
h.fileId = j.curFileId();
// Ops other than basic writes (DurOp's) go first
diff --git a/src/mongo/db/storage/mmap_v1/dur_recover.cpp b/src/mongo/db/storage/mmap_v1/dur_recover.cpp
index e3a686fc21e..c29036b26a3 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recover.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_recover.cpp
@@ -253,7 +253,10 @@ static string fileName(const char* dbName, int fileNo) {
RecoveryJob::RecoveryJob()
- : _recovering(false), _lastDataSyncedFromLastRun(0), _lastSeqMentionedInConsoleLog(1) {}
+ : _recovering(false),
+ _lastDataSyncedFromLastRun(0),
+ _lastSeqSkipped(0),
+ _appliedAnySections(false) {}
RecoveryJob::~RecoveryJob() {
DESTRUCTOR_GUARD(if (!_mmfs.empty()) {} close();)
@@ -382,27 +385,46 @@ void RecoveryJob::processSection(const JSectHeader* h,
LockMongoFilesShared lkFiles; // for RecoveryJob::Last
stdx::lock_guard<stdx::mutex> lk(_mx);
- // Check the footer checksum before doing anything else.
if (_recovering) {
+ // Check the footer checksum before doing anything else.
verify(((const char*)h) + sizeof(JSectHeader) == p);
if (!f->checkHash(h, len + sizeof(JSectHeader))) {
log() << "journal section checksum doesn't match";
throw JournalSectionCorruptException();
}
- }
- if (_recovering && _lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) {
- if (h->seqNumber != _lastSeqMentionedInConsoleLog) {
- static int n;
- if (++n < 10) {
+ static uint64_t numJournalSegmentsSkipped = 0;
+ static const uint64_t kMaxSkippedSectionsToLog = 10;
+ if (_lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs) {
+ if (_appliedAnySections) {
+ severe() << "Journal section sequence number " << h->seqNumber
+ << " is lower than the threshold for applying ("
+ << h->seqNumber + ExtraKeepTimeMs
+ << ") but we have already applied some journal sections. This implies a "
+ << "corrupt journal file.";
+ fassertFailed(34369);
+ }
+
+ if (++numJournalSegmentsSkipped < kMaxSkippedSectionsToLog) {
log() << "recover skipping application of section seq:" << h->seqNumber
<< " < lsn:" << _lastDataSyncedFromLastRun << endl;
- } else if (n == 10) {
+ } else if (numJournalSegmentsSkipped == kMaxSkippedSectionsToLog) {
log() << "recover skipping application of section more..." << endl;
}
- _lastSeqMentionedInConsoleLog = h->seqNumber;
+ _lastSeqSkipped = h->seqNumber;
+ return;
+ }
+
+ if (!_appliedAnySections) {
+ _appliedAnySections = true;
+ if (numJournalSegmentsSkipped >= kMaxSkippedSectionsToLog) {
+ // Log the last skipped section's sequence number if it hasn't been logged before.
+ log() << "recover final skipped journal section had sequence number "
+ << _lastSeqSkipped;
+ }
+ log() << "recover applying initial journal section with sequence number "
+ << h->seqNumber;
}
- return;
}
unique_ptr<JournalSectionIterator> i;
@@ -551,6 +573,12 @@ void RecoveryJob::go(vector<boost::filesystem::path>& files) {
}
}
+ if (_lastSeqSkipped && !_appliedAnySections) {
+ log() << "recover journal replay completed without applying any sections. "
+ << "This can happen if there were no writes after the last fsync of the data files. "
+ << "Last skipped sections had sequence number " << _lastSeqSkipped;
+ }
+
close();
if (mmapv1GlobalOptions.journalOptions & MMAPV1Options::JournalScanOnly) {
diff --git a/src/mongo/db/storage/mmap_v1/dur_recover.h b/src/mongo/db/storage/mmap_v1/dur_recover.h
index e05e7926215..39925f9063f 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recover.h
+++ b/src/mongo/db/storage/mmap_v1/dur_recover.h
@@ -94,7 +94,8 @@ private:
bool _recovering;
unsigned long long _lastDataSyncedFromLastRun;
- unsigned long long _lastSeqMentionedInConsoleLog;
+ unsigned long long _lastSeqSkipped;
+ bool _appliedAnySections;
static RecoveryJob& _instance;
diff --git a/src/mongo/db/storage/mmap_v1/mmap.cpp b/src/mongo/db/storage/mmap_v1/mmap.cpp
index 57559d3038e..618e5777de6 100644
--- a/src/mongo/db/storage/mmap_v1/mmap.cpp
+++ b/src/mongo/db/storage/mmap_v1/mmap.cpp
@@ -166,19 +166,8 @@ void MongoFile::closeAllFiles(stringstream& message) {
return total;
}
-void nullFunc() {}
-
-// callback notifications
-void (*MongoFile::notifyPreFlush)() = nullFunc;
-void (*MongoFile::notifyPostFlush)() = nullFunc;
-
/*static*/ int MongoFile::flushAll(bool sync) {
- if (sync)
- notifyPreFlush();
- int x = _flushAll(sync);
- if (sync)
- notifyPostFlush();
- return x;
+ return _flushAll(sync);
}
/*static*/ int MongoFile::_flushAll(bool sync) {
diff --git a/src/mongo/db/storage/mmap_v1/mmap.h b/src/mongo/db/storage/mmap_v1/mmap.h
index 6413dc26127..a1435ad95d8 100644
--- a/src/mongo/db/storage/mmap_v1/mmap.h
+++ b/src/mongo/db/storage/mmap_v1/mmap.h
@@ -129,10 +129,6 @@ public:
*/
static std::set<MongoFile*>& getAllFiles();
- // callbacks if you need them
- static void (*notifyPreFlush)();
- static void (*notifyPostFlush)();
-
static int flushAll(bool sync); // returns n flushed
static long long totalMappedLength();
static void closeAllFiles(std::stringstream& message);