diff options
author | Geert Bosch <geert@mongodb.com> | 2015-03-26 12:30:27 -0400 |
---|---|---|
committer | Geert Bosch <geert@mongodb.com> | 2015-03-31 15:27:46 -0400 |
commit | b87a1c395527b0981b6613d6ecf949f2c7465ad8 (patch) | |
tree | 02ff5c90b21b053dc9dbe52687599796adc54a0b | |
parent | 5006eb81f94ce90eebb68828b299761b622f0ec1 (diff) | |
download | mongo-b87a1c395527b0981b6613d6ecf949f2c7465ad8.tar.gz |
SERVER-17616: Bound memory usage of MMAPv1 rollback buffers.
(back ported from commit 991ccba6e29ea5b7f51a6ed4a549b8a9291a209b)
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp | 284 | ||||
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_recovery_unit.h | 119 |
2 files changed, 304 insertions, 99 deletions
diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp index 6382fcf5f83..af4d93ba039 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp @@ -33,6 +33,9 @@ #include "mongo/db/storage/mmap_v1/dur_recovery_unit.h" #include <algorithm> +#include <limits> +#include <map> +#include <set> #include <string> #include "mongo/db/storage/mmap_v1/dur.h" @@ -42,12 +45,12 @@ namespace mongo { - DurRecoveryUnit::DurRecoveryUnit() : _mustRollback(false), _rollbackDisabled(false) { - + DurRecoveryUnit::DurRecoveryUnit() + : _writeCount(0), _writeBytes(0), _mustRollback(false), _rollbackWritesDisabled(false) { } void DurRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { - _startOfUncommittedChangesForLevel.push_back(Indexes(_changes.size(), _writes.size())); + _startOfUncommittedChangesForLevel.push_back(Indexes(_changes.size(), _writeCount)); } void DurRecoveryUnit::commitUnitOfWork() { @@ -58,9 +61,9 @@ namespace mongo { // If we are nested, make all changes for this level part of the containing UnitOfWork. // They will be added to the global damages list once the outermost UnitOfWork commits, // which it must now do. - if (haveUncommitedChangesAtCurrentLevel()) { + if (haveUncommittedChangesAtCurrentLevel()) { _startOfUncommittedChangesForLevel.back() = - Indexes(_changes.size(), _writes.size()); + Indexes(_changes.size(), _writeCount); } return; } @@ -74,18 +77,17 @@ namespace mongo { void DurRecoveryUnit::endUnitOfWork() { invariant(inAUnitOfWork()); - if (haveUncommitedChangesAtCurrentLevel()) { - rollbackInnermostChanges(); + if (haveUncommittedChangesAtCurrentLevel()) { + _mustRollback = true; } // Reset back to default if this is the last unwind of the recovery unit. That way, it can // be reused for new operations. if (inOutermostUnitOfWork()) { - dassert(_changes.empty()); - dassert(_writes.empty()); - _preimageBuffer.clear(); - _mustRollback = false; - _rollbackDisabled = false; + rollbackChanges(); + _rollbackWritesDisabled = false; + dassert(_changes.empty() && _initialWrites.empty() && _mergedWrites.empty()); + dassert( _preimageBuffer.empty() && !_writeCount && !_writeBytes && !_mustRollback); } _startOfUncommittedChangesForLevel.pop_back(); @@ -97,98 +99,137 @@ namespace mongo { } void DurRecoveryUnit::commitChanges() { - if (!inAUnitOfWork()) - return; - invariant(!_mustRollback); invariant(inOutermostUnitOfWork()); invariant(_startOfUncommittedChangesForLevel.front().changeIndex == 0); - invariant(_startOfUncommittedChangesForLevel.front().writeIndex == 0); + invariant(_startOfUncommittedChangesForLevel.front().writeCount == 0); if (getDur().isDurable()) - pushChangesToDurSubSystem(); + markWritesForJournaling(); - for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) { + for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); + it != end; ++it) { (*it)->commit(); } - // We now reset to a "clean" state without any uncommitted changes. - _changes.clear(); - _writes.clear(); - _preimageBuffer.clear(); + resetChanges(); + } - void DurRecoveryUnit::pushChangesToDurSubSystem() { - if (_writes.empty()) + void DurRecoveryUnit::markWritesForJournaling() { + if (!_writeCount) return; typedef std::pair<void*, unsigned> Intent; std::vector<Intent> intents; - intents.reserve(_writes.size()); - - // orders by addr so we can coalesce overlapping and adjacent writes - std::sort(_writes.begin(), _writes.end()); - - intents.push_back(std::make_pair(_writes.front().addr, _writes.front().len)); - for (Writes::iterator it = (_writes.begin() + 1), end = _writes.end(); it != end; ++it) { - Intent& lastIntent = intents.back(); - char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second; - if (it->addr <= lastEnd) { - // overlapping or adjacent, so extend. - ptrdiff_t extendedLen = (it->addr + it->len) - static_cast<char*>(lastIntent.first); - lastIntent.second = std::max(lastIntent.second, unsigned(extendedLen)); + const size_t numStoredWrites = _initialWrites.size() + _mergedWrites.size(); + intents.reserve(numStoredWrites); + + // Show very large units of work at LOG(1) level as they may hint at performance issues + const int logLevel = (_writeCount > 100*1000 || _writeBytes > 50*1024*1024) ? 1 : 3; + + LOG(logLevel) << _writeCount << " writes (" << _writeBytes / 1024 << " kB) covered by " + << numStoredWrites << " pre-images (" + << _preimageBuffer.size() / 1024 << " kB) "; + + // orders the initial, unmerged writes, by address so we can coalesce overlapping and + // adjacent writes + std::sort(_initialWrites.begin(), _initialWrites.end()); + + if (!_initialWrites.empty()) { + intents.push_back(std::make_pair(_initialWrites.front().addr, + _initialWrites.front().len)); + for (InitialWrites::iterator it = (_initialWrites.begin() + 1), + end = _initialWrites.end(); + it != end; + ++it) { + Intent& lastIntent = intents.back(); + char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second; + if (it->addr <= lastEnd) { + // overlapping or adjacent, so extend. + ptrdiff_t extendedLen = (it->end()) - static_cast<char*>(lastIntent.first); + lastIntent.second = std::max(lastIntent.second, unsigned(extendedLen)); + } + else { + // not overlapping, so create a new intent + intents.push_back(std::make_pair(it->addr, it->len)); + } } - else { - // not overlapping, so create a new intent - intents.push_back(std::make_pair(it->addr, it->len)); + } + + MergedWrites::iterator it = _mergedWrites.begin(); + if (it != _mergedWrites.end()) { + intents.push_back(std::make_pair(it->addr, it->len)); + while (++it != _mergedWrites.end()) { + // Check the property that write intents are sorted and don't overlap. + invariant(it->addr >= intents.back().first); + Intent& lastIntent = intents.back(); + char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second; + if (it->addr == lastEnd) { + // adjacent, so extend. + lastIntent.second += it->len; + } + else { + // not overlapping, so create a new intent + invariant(it->addr > lastEnd); + intents.push_back(std::make_pair(it->addr, it->len)); + } } } + LOG(logLevel) << _mergedWrites.size() << " pre-images " << "coalesced into " + << intents.size() << " write intents"; getDur().declareWriteIntents(intents); } - void DurRecoveryUnit::rollbackInnermostChanges() { - // Using signed ints to avoid issues in loops below around index 0. - invariant(_changes.size() <= size_t(std::numeric_limits<int>::max())); - invariant(_writes.size() <= size_t(std::numeric_limits<int>::max())); - const int changesRollbackTo = _startOfUncommittedChangesForLevel.back().changeIndex; - const int writesRollbackTo = _startOfUncommittedChangesForLevel.back().writeIndex; + void DurRecoveryUnit::resetChanges() { + _writeCount = 0; + _writeBytes = 0; + _initialWrites.clear(); + _mergedWrites.clear(); + _changes.clear(); + _preimageBuffer.clear(); + } + + void DurRecoveryUnit::rollbackChanges() { + invariant(inOutermostUnitOfWork()); + invariant(!_startOfUncommittedChangesForLevel.back().changeIndex); + invariant(!_startOfUncommittedChangesForLevel.back().writeCount); // First rollback disk writes, then Changes. This matches behavior in other storage engines // that either rollback a transaction or don't write a writebatch. - if (!_rollbackDisabled) { - LOG(2) << " ***** ROLLING BACK " << (_writes.size() - writesRollbackTo) - << " disk writes"; + if (_rollbackWritesDisabled) { + LOG(2) << " ***** NOT ROLLING BACK " << _writeCount << " disk writes"; + } + else { + LOG(2) << " ***** ROLLING BACK " << _writeCount << " disk writes"; + + // First roll back the merged writes. These have no overlap or ordering requirement + // other than needing to be rolled back before all _initialWrites. + for (MergedWrites::iterator it = _mergedWrites.begin(); + it != _mergedWrites.end(); + ++it) { + _preimageBuffer.copy(it->addr, it->len, it->offset); + } - for (int i = _writes.size() - 1; i >= writesRollbackTo; i--) { - // TODO need to add these pages to our "dirty count" somehow. - _preimageBuffer.copy(_writes[i].addr, _writes[i].len, _writes[i].offset); + // Then roll back the initial writes in LIFO order, as these might have overlaps. + for (InitialWrites::reverse_iterator rit = _initialWrites.rbegin(); + rit != _initialWrites.rend(); + ++rit) { + _preimageBuffer.copy(rit->addr, rit->len, rit->offset); } } - LOG(2) << " ***** ROLLING BACK " << (_changes.size() - changesRollbackTo) - << " custom changes"; - for (int i = _changes.size() - 1; i >= changesRollbackTo; i--) { + LOG(2) << " ***** ROLLING BACK " << (_changes.size()) << " custom changes"; + + for (int i = _changes.size() - 1; i >= 0; i--) { LOG(2) << "CUSTOM ROLLBACK " << demangleName(typeid(*_changes[i])); _changes[i]->rollback(); } - _writes.erase(_writes.begin() + writesRollbackTo, _writes.end()); - _changes.erase(_changes.begin() + changesRollbackTo, _changes.end()); - - if (inOutermostUnitOfWork()) { - // We just rolled back so we are now "clean" and don't need to roll back anymore. - invariant(_changes.empty()); - invariant(_writes.empty()); - _preimageBuffer.clear(); - _mustRollback = false; - } - else { - // Inner UOW rolled back, so outer must not commit. We can loosen this in the future, - // but that would require all StorageEngines to support rollback of nested transactions. - _mustRollback = true; - } + resetChanges(); + _mustRollback = false; } bool DurRecoveryUnit::awaitCommit() { @@ -196,26 +237,115 @@ namespace mongo { return getDur().awaitCommit(); } - void* DurRecoveryUnit::writingPtr(void* data, size_t len) { + void DurRecoveryUnit::mergingWritingPtr(char* addr, size_t len) { + // The invariant is that all writes are non-overlapping and non-empty. So, a single + // writingPtr call may result in a number of new segments added. At this point, we cannot + // in general merge adjacent writes, as that would require inefficient operations on the + // preimage buffer. + + MergedWrites::iterator coveringWrite = _mergedWrites.upper_bound(Write(addr, 0, 0)); + + char* const end = addr + len; + while (addr < end) { + dassert(coveringWrite == _mergedWrites.end() || coveringWrite->end() > addr); + + // Determine whether addr[0] is already covered by a write or not. + // If covered, adjust addr and len to exclude the covered run from addr[0] onwards. + + if (coveringWrite != _mergedWrites.end()) { + char* const cwEnd = coveringWrite->end(); + + if (coveringWrite->addr <= addr) { + // If the begin of the covering write at or before addr[0], addr[0] is covered. + // While the existing pre-image will not generally be the same as the data + // being written now, during rollback only the oldest pre-image matters. + + if (end <= cwEnd) { + break; // fully covered + } + + addr = cwEnd; + coveringWrite++; + dassert(coveringWrite == _mergedWrites.end() || coveringWrite->addr >= cwEnd); + } + } + dassert(coveringWrite == _mergedWrites.end() || coveringWrite->end() > addr); + + // If the next coveringWrite overlaps, adjust the end of the uncovered region. + char* uncoveredEnd = end; + if (coveringWrite != _mergedWrites.end() && coveringWrite->addr < end) { + uncoveredEnd = coveringWrite->addr; + } + + const size_t uncoveredLen = uncoveredEnd - addr; + if (uncoveredLen) { + // We are writing to a region that hasn't been declared previously. + _mergedWrites.insert(Write(addr, uncoveredLen, _preimageBuffer.size())); + + // Windows requires us to adjust the address space *before* we write to anything. + privateViews.makeWritable(addr, uncoveredLen); + + if (!_rollbackWritesDisabled) { + _preimageBuffer.append(addr, uncoveredLen); + } + addr = uncoveredEnd; + } + } + } + + void* DurRecoveryUnit::writingPtr(void* addr, size_t len) { invariant(inAUnitOfWork()); - if (len == 0) return data; // Don't need to do anything for empty ranges. + if (len == 0) { + return addr; // Don't need to do anything for empty ranges. + } + invariant(len < size_t(std::numeric_limits<int>::max())); + _writeCount++; + _writeBytes += len; + char* const data = static_cast<char*>(addr); + + // The initial writes are stored in a faster, but less memory-efficient way. This will + // typically be enough for simple operations, where the extra cost of incremental + // coalescing and merging would be too much. For larger writes, more redundancy is + // is expected, so the cost of checking for duplicates is offset by savings in copying + // and allocating preimage buffers. Total memory use of the preimage buffer may be up to + // kMaxUnmergedPreimageBytes larger than the amount memory covered by the write intents. + +#ifdef _DEBUG + const size_t kMaxUnmergedPreimageBytes = 16*1024; +#else + const size_t kMaxUnmergedPreimageBytes = 10*1024*1024; +#endif + + if (_preimageBuffer.size() + len > kMaxUnmergedPreimageBytes) { + mergingWritingPtr(data, len); + + // After a merged write, no more initial writes can occur or there would be an + // ordering violation during rollback. So, ensure that the if-condition will be true + // for any future write regardless of length. This is true now because + // mergingWritingPtr also will store its first write in _preimageBuffer as well. + invariant(_preimageBuffer.size() >= kMaxUnmergedPreimageBytes); + + return addr; + } + // Windows requires us to adjust the address space *before* we write to anything. privateViews.makeWritable(data, len); - _writes.push_back(Write(static_cast<char*>(data), len, _preimageBuffer.size())); - if (!_rollbackDisabled) { - _preimageBuffer.append(static_cast<char*>(data), len); + _initialWrites.push_back(Write(data, len, _preimageBuffer.size())); + + if (!_rollbackWritesDisabled) { + _preimageBuffer.append(data, len); } - return data; + return addr; } void DurRecoveryUnit::setRollbackWritesDisabled() { invariant(inOutermostUnitOfWork()); - _rollbackDisabled = true; + _rollbackWritesDisabled = true; } void DurRecoveryUnit::registerChange(Change* change) { diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h index 9eb76f652cf..be54f9b76b9 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h +++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h @@ -26,6 +26,9 @@ * it in the license file. */ +#include <set> +#include <string> +#include <utility> #include <vector> #include "mongo/base/owned_pointer_vector.h" @@ -56,15 +59,32 @@ namespace mongo { // The recovery unit takes ownership of change. virtual void registerChange(Change* change); - virtual void* writingPtr(void* data, size_t len); + virtual void* writingPtr(void* addr, size_t len); virtual void setRollbackWritesDisabled(); virtual SnapshotId getSnapshotId() const { return SnapshotId(); } + private: + /** + * Marks writes for journaling, if enabled, and then commits all other Changes in order. + * Returns with empty _initialWrites, _mergedWrites, _changes and _preimageBuffer, but + * does not reset the _rollbackWritesDisabled or _mustRollback flags. This leaves the + * RecoveryUnit ready for more changes that may be committed or rolled back. + */ void commitChanges(); - void pushChangesToDurSubSystem(); - void rollbackInnermostChanges(); + + /** + * Creates a list of write intents to be journaled, and hands it of to the active + * DurabilityInterface. + */ + void markWritesForJournaling(); + + /** + * Restores state by rolling back all writes using the saved pre-images, and then + * rolling back all other Changes in LIFO order. Resets internal state. + */ + void rollbackChanges(); bool inAUnitOfWork() const { return !_startOfUncommittedChangesForLevel.empty(); } @@ -72,41 +92,96 @@ namespace mongo { return _startOfUncommittedChangesForLevel.size() == 1; } - bool haveUncommitedChangesAtCurrentLevel() const { - return _writes.size() > _startOfUncommittedChangesForLevel.back().writeIndex + /** + * If true, ending a unit of work will cause rollback. Ending a (possibly nested) unit of + * work without committing and without making any changes will not cause rollback. + */ + bool haveUncommittedChangesAtCurrentLevel() const { + return _writeCount > _startOfUncommittedChangesForLevel.back().writeCount || _changes.size() > _startOfUncommittedChangesForLevel.back().changeIndex; } + /** + * Version of writingPtr that checks existing writes for overlap and only stores those + * changes not yet covered by an existing write intent and pre-image. + */ + void mergingWritingPtr(char* data, size_t len); + + /** + * Reset to a clean state without any uncommitted changes or write. + */ + void resetChanges(); + // Changes are ordered from oldest to newest. typedef OwnedPointerVector<Change> Changes; Changes _changes; - // These are memory writes inside the mmapv1 mmaped files. Writes are ordered from oldest to - // newest. Overlapping and duplicate regions are allowed, since rollback undoes changes in - // reverse order. - std::string _preimageBuffer; + + // Number of pending uncommitted writes. Incremented even if new write is fully covered by + // existing writes. + size_t _writeCount; + // Total size of the pending uncommitted writes. + size_t _writeBytes; + + /** + * These are memory writes inside the mmapv1 mmap-ed files. A pointer past the end is just + * instead of a pointer to the beginning for the benefit of MergedWrites. + */ struct Write { - Write(char* addr, int len, int offset) : addr(addr), len(len), offset(offset) {} + Write(char* addr, int len, int offset) : addr(addr), len(len), offset(offset) { } + Write(const Write& rhs) : addr(rhs.addr), len(rhs.len), offset(rhs.offset) { } + Write() : addr(0), len(0), offset(0) { } + bool operator< (const Write& rhs) const { return addr < rhs.addr; } + + struct compareEnd { + bool operator() (const Write& lhs, const Write& rhs) const { + return lhs.addr + lhs.len < rhs.addr + rhs.len; + } + }; - bool operator < (const Write& rhs) const { return addr < rhs.addr; } + char* end() const { + return addr + len; + } char* addr; int len; - int offset; // index into _preimageBuffer; + int offset; // index into _preimageBuffer }; - typedef std::vector<Write> Writes; - Writes _writes; - // Indexes of the first uncommitted Change/Write in _changes/_writes for each nesting level. - // Index 0 in this vector is always the outermost transaction and back() is always the - // innermost. The size() is the current nesting level. + /** + * Writes are ordered by ending address, so MergedWrites::upper_bound() can find the first + * overlapping write, if any. Overlapping and duplicate regions are forbidden, as rollback + * of MergedChanges undoes changes by address rather than LIFO order. In addition, empty + * regions are not allowed. Storing writes by age does not work well for large indexed + * arrays, as coalescing is needed to bound the size of the preimage buffer. + */ + typedef std::set<Write, Write::compareEnd> MergedWrites; + MergedWrites _mergedWrites; + + // Generally it's more efficient to just store pre-images unconditionally and then + // sort/eliminate duplicates at commit time. However, this can lead to excessive memory + // use in cases involving large indexes arrays, where the same memory is written many + // times. To keep the speed for the general case and bound memory use, the first few MB of + // pre-images are stored unconditionally, but once the threshold has been exceeded, the + // remainder is stored in a more space-efficient datastructure. + typedef std::vector<Write> InitialWrites; + InitialWrites _initialWrites; + + std::string _preimageBuffer; + + // Index of the first uncommitted Change in _changes and number of writes for each nesting + // level. Store the number of writes as maintained in _writeCount rather than the sum of + // _initialWrites and _mergedWrites, as coalescing might otherwise result in + // haveUncommittedChangesAtCurrent level missing merged writes when determining if rollback + // is necessary. Index 0 in this vector is always the outermost transaction and back() is + // always the innermost. The size() is the current nesting level. struct Indexes { - Indexes(size_t changeIndex, size_t writeIndex) + Indexes(size_t changeIndex, size_t writeCount) : changeIndex(changeIndex) - , writeIndex(writeIndex) + , writeCount(writeCount) {} size_t changeIndex; - size_t writeIndex; + size_t writeCount; }; std::vector<Indexes> _startOfUncommittedChangesForLevel; @@ -114,10 +189,10 @@ namespace mongo { // outermost WUOW rolls back it reverts to false. bool _mustRollback; - // Default is false. + // Default is false. // If true, no preimages are tracked. If rollback is subsequently attempted, the process // will abort. - bool _rollbackDisabled; + bool _rollbackWritesDisabled; }; } // namespace mongo |