diff options
Diffstat (limited to 'src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp')
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp | 419 |
1 files changed, 201 insertions, 218 deletions
diff --git a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp index 4c6eb8ec8cc..971f2aa0e60 100644 --- a/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_journal_writer.cpp @@ -47,268 +47,251 @@ namespace dur { namespace { - /** - * Apply the writes back to the non-private MMF after they are for certain in the journal. - * - * (1) TODO we don't need to write back everything every group commit. We MUST write back that - * which is going to be a remapped on its private view - but that might not be all views. - * - * (2) TODO should we do this using N threads? Would be quite easy see Hackenberg paper table - * 5 and 6. 2 threads might be a good balance. - */ - void WRITETODATAFILES(const JSectHeader& h, const AlignedBuilder& uncompressed) { - Timer t; - - LOG(4) << "WRITETODATAFILES BEGIN"; - - RecoveryJob::get().processSection(&h, uncompressed.buf(), uncompressed.len(), NULL); - - const long long m = t.micros(); - stats.curr()->_writeToDataFilesMicros += m; - - LOG(4) << "journal WRITETODATAFILES " << m / 1000.0 << "ms"; - } - -} // namespace - +/** + * Apply the writes back to the non-private MMF after they are for certain in the journal. + * + * (1) TODO we don't need to write back everything every group commit. We MUST write back that + * which is going to be a remapped on its private view - but that might not be all views. + * + * (2) TODO should we do this using N threads? Would be quite easy see Hackenberg paper table + * 5 and 6. 2 threads might be a good balance. + */ +void WRITETODATAFILES(const JSectHeader& h, const AlignedBuilder& uncompressed) { + Timer t; - /** - * Used inside the journal writer thread to ensure that used buffers are cleaned up properly. - */ - class BufferGuard { - MONGO_DISALLOW_COPYING(BufferGuard); - public: - BufferGuard(JournalWriter::Buffer* buffer, JournalWriter::BufferQueue* bufferQueue) - : _buffer(buffer), - _bufferQueue(bufferQueue) { + LOG(4) << "WRITETODATAFILES BEGIN"; - } + RecoveryJob::get().processSection(&h, uncompressed.buf(), uncompressed.len(), NULL); - ~BufferGuard() { - // This buffer is done. Reset and remove it from the journal queue and put it on - // the ready queue. - _buffer->_reset(); + const long long m = t.micros(); + stats.curr()->_writeToDataFilesMicros += m; - // This should never block. Otherwise we will stall the journaling pipeline - // permanently and cause deadlock. - invariant(_bufferQueue->count() < _bufferQueue->maxSize()); - _bufferQueue->push(_buffer); - } + LOG(4) << "journal WRITETODATAFILES " << m / 1000.0 << "ms"; +} - private: - // Buffer that this scoped object is managing. Owned until destruction time. Then, the - // bufferQueue owns it. - JournalWriter::Buffer* const _buffer; +} // namespace - // Queue where the buffer should be returned to at destruction time. Not owned. - JournalWriter::BufferQueue* const _bufferQueue; - }; +/** + * Used inside the journal writer thread to ensure that used buffers are cleaned up properly. + */ +class BufferGuard { + MONGO_DISALLOW_COPYING(BufferGuard); + +public: + BufferGuard(JournalWriter::Buffer* buffer, JournalWriter::BufferQueue* bufferQueue) + : _buffer(buffer), _bufferQueue(bufferQueue) {} + + ~BufferGuard() { + // This buffer is done. Reset and remove it from the journal queue and put it on + // the ready queue. + _buffer->_reset(); + + // This should never block. Otherwise we will stall the journaling pipeline + // permanently and cause deadlock. + invariant(_bufferQueue->count() < _bufferQueue->maxSize()); + _bufferQueue->push(_buffer); + } - // - // JournalWriter - // +private: + // Buffer that this scoped object is managing. Owned until destruction time. Then, the + // bufferQueue owns it. + JournalWriter::Buffer* const _buffer; + + // Queue where the buffer should be returned to at destruction time. Not owned. + JournalWriter::BufferQueue* const _bufferQueue; +}; + + +// +// JournalWriter +// + +JournalWriter::JournalWriter(NotifyAll* commitNotify, + NotifyAll* applyToDataFilesNotify, + size_t numBuffers) + : _commitNotify(commitNotify), + _applyToDataFilesNotify(applyToDataFilesNotify), + _shutdownRequested(false), + _journalQueue(numBuffers), + _lastCommitNumber(0), + _readyQueue(numBuffers) { + invariant(_journalQueue.maxSize() == _readyQueue.maxSize()); +} + +JournalWriter::~JournalWriter() { + // Never close the journal writer with outstanding or unaccounted writes + invariant(_journalQueue.empty()); + invariant(_readyQueue.empty()); +} + +void JournalWriter::start() { + // Do not allow reuse + invariant(!_shutdownRequested); + + // Pre-allocate the journal buffers and push them on the ready queue + for (size_t i = 0; i < _readyQueue.maxSize(); i++) { + _readyQueue.push(new Buffer(InitialBufferSizeBytes)); + } - JournalWriter::JournalWriter(NotifyAll* commitNotify, - NotifyAll* applyToDataFilesNotify, - size_t numBuffers) - : _commitNotify(commitNotify), - _applyToDataFilesNotify(applyToDataFilesNotify), - _shutdownRequested(false), - _journalQueue(numBuffers), - _lastCommitNumber(0), - _readyQueue(numBuffers) { + // Start the thread + stdx::thread t(stdx::bind(&JournalWriter::_journalWriterThread, this)); + _journalWriterThreadHandle.swap(t); +} - invariant(_journalQueue.maxSize() == _readyQueue.maxSize()); - } +void JournalWriter::shutdown() { + // There is no reason to call shutdown multiple times + invariant(!_shutdownRequested); + _shutdownRequested = true; - JournalWriter::~JournalWriter() { - // Never close the journal writer with outstanding or unaccounted writes - invariant(_journalQueue.empty()); - invariant(_readyQueue.empty()); - } + // Never terminate the journal writer with outstanding or unaccounted writes + assertIdle(); - void JournalWriter::start() { - // Do not allow reuse - invariant(!_shutdownRequested); + Buffer* const shutdownBuffer = newBuffer(); + shutdownBuffer->_setShutdown(); - // Pre-allocate the journal buffers and push them on the ready queue - for (size_t i = 0; i < _readyQueue.maxSize(); i++) { - _readyQueue.push(new Buffer(InitialBufferSizeBytes)); - } + // This will terminate the journal thread. No need to specify commit number, since we are + // shutting down and nothing will be notified anyways. + writeBuffer(shutdownBuffer, 0); - // Start the thread - stdx::thread t(stdx::bind(&JournalWriter::_journalWriterThread, this)); - _journalWriterThreadHandle.swap(t); - } + // Ensure the journal thread has stopped and everything accounted for. + _journalWriterThreadHandle.join(); + assertIdle(); - void JournalWriter::shutdown() { - // There is no reason to call shutdown multiple times - invariant(!_shutdownRequested); - _shutdownRequested = true; - - // Never terminate the journal writer with outstanding or unaccounted writes - assertIdle(); - - Buffer* const shutdownBuffer = newBuffer(); - shutdownBuffer->_setShutdown(); - - // This will terminate the journal thread. No need to specify commit number, since we are - // shutting down and nothing will be notified anyways. - writeBuffer(shutdownBuffer, 0); - - // Ensure the journal thread has stopped and everything accounted for. - _journalWriterThreadHandle.join(); - assertIdle(); - - // Delete the buffers (this deallocates the journal buffer memory) - while (!_readyQueue.empty()) { - Buffer* const buffer = _readyQueue.blockingPop(); - delete buffer; - } + // Delete the buffers (this deallocates the journal buffer memory) + while (!_readyQueue.empty()) { + Buffer* const buffer = _readyQueue.blockingPop(); + delete buffer; } +} - void JournalWriter::assertIdle() { - // All buffers are in the ready queue means there is nothing pending. - invariant(_journalQueue.empty()); - invariant(_readyQueue.count() == _readyQueue.maxSize()); - } +void JournalWriter::assertIdle() { + // All buffers are in the ready queue means there is nothing pending. + invariant(_journalQueue.empty()); + invariant(_readyQueue.count() == _readyQueue.maxSize()); +} - JournalWriter::Buffer* JournalWriter::newBuffer() { - Buffer* const buffer = _readyQueue.blockingPop(); - buffer->_assertEmpty(); +JournalWriter::Buffer* JournalWriter::newBuffer() { + Buffer* const buffer = _readyQueue.blockingPop(); + buffer->_assertEmpty(); - return buffer; - } + return buffer; +} - void JournalWriter::writeBuffer(Buffer* buffer, NotifyAll::When commitNumber) { - invariant(buffer->_commitNumber == 0); - invariant((commitNumber > _lastCommitNumber) || - (buffer->_isShutdown && (commitNumber == 0))); +void JournalWriter::writeBuffer(Buffer* buffer, NotifyAll::When commitNumber) { + invariant(buffer->_commitNumber == 0); + invariant((commitNumber > _lastCommitNumber) || (buffer->_isShutdown && (commitNumber == 0))); - buffer->_commitNumber = commitNumber; + buffer->_commitNumber = commitNumber; - _journalQueue.push(buffer); - } + _journalQueue.push(buffer); +} - void JournalWriter::flush() { - std::vector<Buffer*> buffers; +void JournalWriter::flush() { + std::vector<Buffer*> buffers; - // Pop the expected number of buffers from the ready queue. This will block until all - // in-progress buffers have completed. - for (size_t i = 0; i < _readyQueue.maxSize(); i++) { - buffers.push_back(_readyQueue.blockingPop()); - } + // Pop the expected number of buffers from the ready queue. This will block until all + // in-progress buffers have completed. + for (size_t i = 0; i < _readyQueue.maxSize(); i++) { + buffers.push_back(_readyQueue.blockingPop()); + } - // Put them back in to restore the original state. - for (size_t i = 0; i < buffers.size(); i++) { - _readyQueue.push(buffers[i]); - } + // Put them back in to restore the original state. + for (size_t i = 0; i < buffers.size(); i++) { + _readyQueue.push(buffers[i]); } +} - void JournalWriter::_journalWriterThread() { - Client::initThread("journal writer"); +void JournalWriter::_journalWriterThread() { + Client::initThread("journal writer"); - log() << "Journal writer thread started"; + log() << "Journal writer thread started"; - try { - while (true) { - Buffer* const buffer = _journalQueue.blockingPop(); - BufferGuard bufferGuard(buffer, &_readyQueue); + try { + while (true) { + Buffer* const buffer = _journalQueue.blockingPop(); + BufferGuard bufferGuard(buffer, &_readyQueue); - if (buffer->_isShutdown) { - invariant(buffer->_builder.len() == 0); + if (buffer->_isShutdown) { + invariant(buffer->_builder.len() == 0); - // The journal writer thread is terminating. Nothing to notify or write. - break; - } + // The journal writer thread is terminating. Nothing to notify or write. + break; + } - if (buffer->_isNoop) { - invariant(buffer->_builder.len() == 0); + if (buffer->_isNoop) { + invariant(buffer->_builder.len() == 0); - // There's nothing to be writen, but we still need to notify this commit number - _commitNotify->notifyAll(buffer->_commitNumber); - _applyToDataFilesNotify->notifyAll(buffer->_commitNumber); - continue; - } + // There's nothing to be writen, but we still need to notify this commit number + _commitNotify->notifyAll(buffer->_commitNumber); + _applyToDataFilesNotify->notifyAll(buffer->_commitNumber); + continue; + } - LOG(4) << "Journaling commit number " << buffer->_commitNumber - << " (journal file " << buffer->_header.fileId - << ", sequence " << buffer->_header.seqNumber - << ", size " << buffer->_builder.len() << " bytes)"; + LOG(4) << "Journaling commit number " << buffer->_commitNumber << " (journal file " + << buffer->_header.fileId << ", sequence " << buffer->_header.seqNumber + << ", size " << buffer->_builder.len() << " bytes)"; - // This performs synchronous I/O to the journal file and will block. - WRITETOJOURNAL(buffer->_header, buffer->_builder); + // This performs synchronous I/O to the journal file and will block. + WRITETOJOURNAL(buffer->_header, buffer->_builder); - // Data is now persisted in the journal, which is sufficient for acknowledging - // getLastError - _commitNotify->notifyAll(buffer->_commitNumber); + // Data is now persisted in the journal, which is sufficient for acknowledging + // getLastError + _commitNotify->notifyAll(buffer->_commitNumber); - // Apply the journal entries on top of the shared view so that when flush is - // requested it would write the latest. - WRITETODATAFILES(buffer->_header, buffer->_builder); + // Apply the journal entries on top of the shared view so that when flush is + // requested it would write the latest. + WRITETODATAFILES(buffer->_header, buffer->_builder); - // Data is now persisted on the shared view, so notify any potential journal file - // cleanup waiters. - _applyToDataFilesNotify->notifyAll(buffer->_commitNumber); - } + // Data is now persisted on the shared view, so notify any potential journal file + // cleanup waiters. + _applyToDataFilesNotify->notifyAll(buffer->_commitNumber); } - catch (const DBException& e) { - severe() << "dbexception in journalWriterThread causing immediate shutdown: " - << e.toString(); - invariant(false); - } - catch (const std::ios_base::failure& e) { - severe() << "ios_base exception in journalWriterThread causing immediate shutdown: " - << e.what(); - invariant(false); - } - catch (const std::bad_alloc& e) { - severe() << "bad_alloc exception in journalWriterThread causing immediate shutdown: " - << e.what(); - invariant(false); - } - catch (const std::exception& e) { - severe() << "exception in journalWriterThread causing immediate shutdown: " - << e.what(); - invariant(false); - } - catch (...) { - severe() << "unhandled exception in journalWriterThread causing immediate shutdown"; - invariant(false); - } - - log() << "Journal writer thread stopped"; + } catch (const DBException& e) { + severe() << "dbexception in journalWriterThread causing immediate shutdown: " + << e.toString(); + invariant(false); + } catch (const std::ios_base::failure& e) { + severe() << "ios_base exception in journalWriterThread causing immediate shutdown: " + << e.what(); + invariant(false); + } catch (const std::bad_alloc& e) { + severe() << "bad_alloc exception in journalWriterThread causing immediate shutdown: " + << e.what(); + invariant(false); + } catch (const std::exception& e) { + severe() << "exception in journalWriterThread causing immediate shutdown: " << e.what(); + invariant(false); + } catch (...) { + severe() << "unhandled exception in journalWriterThread causing immediate shutdown"; + invariant(false); } + log() << "Journal writer thread stopped"; +} - // - // Buffer - // - - JournalWriter::Buffer::Buffer(size_t initialSize) - : _commitNumber(0), - _isNoop(false), - _isShutdown(false), - _header(), - _builder(initialSize) { - } +// +// Buffer +// - JournalWriter::Buffer::~Buffer() { - _assertEmpty(); - } +JournalWriter::Buffer::Buffer(size_t initialSize) + : _commitNumber(0), _isNoop(false), _isShutdown(false), _header(), _builder(initialSize) {} - void JournalWriter::Buffer::_assertEmpty() { - invariant(_commitNumber == 0); - invariant(_builder.len() == 0); - } +JournalWriter::Buffer::~Buffer() { + _assertEmpty(); +} - void JournalWriter::Buffer::_reset() { - _commitNumber = 0; - _isNoop = false; - _builder.reset(); - } +void JournalWriter::Buffer::_assertEmpty() { + invariant(_commitNumber == 0); + invariant(_builder.len() == 0); +} + +void JournalWriter::Buffer::_reset() { + _commitNumber = 0; + _isNoop = false; + _builder.reset(); +} -} // namespace dur -} // namespace mongo +} // namespace dur +} // namespace mongo |