summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeert Bosch <geert@mongodb.com>2015-03-26 12:30:27 -0400
committerGeert Bosch <geert@mongodb.com>2015-03-31 15:27:46 -0400
commitb87a1c395527b0981b6613d6ecf949f2c7465ad8 (patch)
tree02ff5c90b21b053dc9dbe52687599796adc54a0b
parent5006eb81f94ce90eebb68828b299761b622f0ec1 (diff)
downloadmongo-b87a1c395527b0981b6613d6ecf949f2c7465ad8.tar.gz
SERVER-17616: Bound memory usage of MMAPv1 rollback buffers.
(back ported from commit 991ccba6e29ea5b7f51a6ed4a549b8a9291a209b)
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp284
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recovery_unit.h119
2 files changed, 304 insertions, 99 deletions
diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
index 6382fcf5f83..af4d93ba039 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
@@ -33,6 +33,9 @@
#include "mongo/db/storage/mmap_v1/dur_recovery_unit.h"
#include <algorithm>
+#include <limits>
+#include <map>
+#include <set>
#include <string>
#include "mongo/db/storage/mmap_v1/dur.h"
@@ -42,12 +45,12 @@
namespace mongo {
- DurRecoveryUnit::DurRecoveryUnit() : _mustRollback(false), _rollbackDisabled(false) {
-
+ DurRecoveryUnit::DurRecoveryUnit()
+ : _writeCount(0), _writeBytes(0), _mustRollback(false), _rollbackWritesDisabled(false) {
}
void DurRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
- _startOfUncommittedChangesForLevel.push_back(Indexes(_changes.size(), _writes.size()));
+ _startOfUncommittedChangesForLevel.push_back(Indexes(_changes.size(), _writeCount));
}
void DurRecoveryUnit::commitUnitOfWork() {
@@ -58,9 +61,9 @@ namespace mongo {
// If we are nested, make all changes for this level part of the containing UnitOfWork.
// They will be added to the global damages list once the outermost UnitOfWork commits,
// which it must now do.
- if (haveUncommitedChangesAtCurrentLevel()) {
+ if (haveUncommittedChangesAtCurrentLevel()) {
_startOfUncommittedChangesForLevel.back() =
- Indexes(_changes.size(), _writes.size());
+ Indexes(_changes.size(), _writeCount);
}
return;
}
@@ -74,18 +77,17 @@ namespace mongo {
void DurRecoveryUnit::endUnitOfWork() {
invariant(inAUnitOfWork());
- if (haveUncommitedChangesAtCurrentLevel()) {
- rollbackInnermostChanges();
+ if (haveUncommittedChangesAtCurrentLevel()) {
+ _mustRollback = true;
}
// Reset back to default if this is the last unwind of the recovery unit. That way, it can
// be reused for new operations.
if (inOutermostUnitOfWork()) {
- dassert(_changes.empty());
- dassert(_writes.empty());
- _preimageBuffer.clear();
- _mustRollback = false;
- _rollbackDisabled = false;
+ rollbackChanges();
+ _rollbackWritesDisabled = false;
+ dassert(_changes.empty() && _initialWrites.empty() && _mergedWrites.empty());
+ dassert( _preimageBuffer.empty() && !_writeCount && !_writeBytes && !_mustRollback);
}
_startOfUncommittedChangesForLevel.pop_back();
@@ -97,98 +99,137 @@ namespace mongo {
}
void DurRecoveryUnit::commitChanges() {
- if (!inAUnitOfWork())
- return;
-
invariant(!_mustRollback);
invariant(inOutermostUnitOfWork());
invariant(_startOfUncommittedChangesForLevel.front().changeIndex == 0);
- invariant(_startOfUncommittedChangesForLevel.front().writeIndex == 0);
+ invariant(_startOfUncommittedChangesForLevel.front().writeCount == 0);
if (getDur().isDurable())
- pushChangesToDurSubSystem();
+ markWritesForJournaling();
- for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
+ for (Changes::const_iterator it = _changes.begin(), end = _changes.end();
+ it != end; ++it) {
(*it)->commit();
}
- // We now reset to a "clean" state without any uncommitted changes.
- _changes.clear();
- _writes.clear();
- _preimageBuffer.clear();
+ resetChanges();
+
}
- void DurRecoveryUnit::pushChangesToDurSubSystem() {
- if (_writes.empty())
+ void DurRecoveryUnit::markWritesForJournaling() {
+ if (!_writeCount)
return;
typedef std::pair<void*, unsigned> Intent;
std::vector<Intent> intents;
- intents.reserve(_writes.size());
-
- // orders by addr so we can coalesce overlapping and adjacent writes
- std::sort(_writes.begin(), _writes.end());
-
- intents.push_back(std::make_pair(_writes.front().addr, _writes.front().len));
- for (Writes::iterator it = (_writes.begin() + 1), end = _writes.end(); it != end; ++it) {
- Intent& lastIntent = intents.back();
- char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second;
- if (it->addr <= lastEnd) {
- // overlapping or adjacent, so extend.
- ptrdiff_t extendedLen = (it->addr + it->len) - static_cast<char*>(lastIntent.first);
- lastIntent.second = std::max(lastIntent.second, unsigned(extendedLen));
+ const size_t numStoredWrites = _initialWrites.size() + _mergedWrites.size();
+ intents.reserve(numStoredWrites);
+
+ // Show very large units of work at LOG(1) level as they may hint at performance issues
+ const int logLevel = (_writeCount > 100*1000 || _writeBytes > 50*1024*1024) ? 1 : 3;
+
+ LOG(logLevel) << _writeCount << " writes (" << _writeBytes / 1024 << " kB) covered by "
+ << numStoredWrites << " pre-images ("
+ << _preimageBuffer.size() / 1024 << " kB) ";
+
+ // orders the initial, unmerged writes, by address so we can coalesce overlapping and
+ // adjacent writes
+ std::sort(_initialWrites.begin(), _initialWrites.end());
+
+ if (!_initialWrites.empty()) {
+ intents.push_back(std::make_pair(_initialWrites.front().addr,
+ _initialWrites.front().len));
+ for (InitialWrites::iterator it = (_initialWrites.begin() + 1),
+ end = _initialWrites.end();
+ it != end;
+ ++it) {
+ Intent& lastIntent = intents.back();
+ char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second;
+ if (it->addr <= lastEnd) {
+ // overlapping or adjacent, so extend.
+ ptrdiff_t extendedLen = (it->end()) - static_cast<char*>(lastIntent.first);
+ lastIntent.second = std::max(lastIntent.second, unsigned(extendedLen));
+ }
+ else {
+ // not overlapping, so create a new intent
+ intents.push_back(std::make_pair(it->addr, it->len));
+ }
}
- else {
- // not overlapping, so create a new intent
- intents.push_back(std::make_pair(it->addr, it->len));
+ }
+
+ MergedWrites::iterator it = _mergedWrites.begin();
+ if (it != _mergedWrites.end()) {
+ intents.push_back(std::make_pair(it->addr, it->len));
+ while (++it != _mergedWrites.end()) {
+ // Check the property that write intents are sorted and don't overlap.
+ invariant(it->addr >= intents.back().first);
+ Intent& lastIntent = intents.back();
+ char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second;
+ if (it->addr == lastEnd) {
+ // adjacent, so extend.
+ lastIntent.second += it->len;
+ }
+ else {
+ // not overlapping, so create a new intent
+ invariant(it->addr > lastEnd);
+ intents.push_back(std::make_pair(it->addr, it->len));
+ }
}
}
+ LOG(logLevel) << _mergedWrites.size() << " pre-images " << "coalesced into "
+ << intents.size() << " write intents";
getDur().declareWriteIntents(intents);
}
- void DurRecoveryUnit::rollbackInnermostChanges() {
- // Using signed ints to avoid issues in loops below around index 0.
- invariant(_changes.size() <= size_t(std::numeric_limits<int>::max()));
- invariant(_writes.size() <= size_t(std::numeric_limits<int>::max()));
- const int changesRollbackTo = _startOfUncommittedChangesForLevel.back().changeIndex;
- const int writesRollbackTo = _startOfUncommittedChangesForLevel.back().writeIndex;
+ void DurRecoveryUnit::resetChanges() {
+ _writeCount = 0;
+ _writeBytes = 0;
+ _initialWrites.clear();
+ _mergedWrites.clear();
+ _changes.clear();
+ _preimageBuffer.clear();
+ }
+
+ void DurRecoveryUnit::rollbackChanges() {
+ invariant(inOutermostUnitOfWork());
+ invariant(!_startOfUncommittedChangesForLevel.back().changeIndex);
+ invariant(!_startOfUncommittedChangesForLevel.back().writeCount);
// First rollback disk writes, then Changes. This matches behavior in other storage engines
// that either rollback a transaction or don't write a writebatch.
- if (!_rollbackDisabled) {
- LOG(2) << " ***** ROLLING BACK " << (_writes.size() - writesRollbackTo)
- << " disk writes";
+ if (_rollbackWritesDisabled) {
+ LOG(2) << " ***** NOT ROLLING BACK " << _writeCount << " disk writes";
+ }
+ else {
+ LOG(2) << " ***** ROLLING BACK " << _writeCount << " disk writes";
+
+ // First roll back the merged writes. These have no overlap or ordering requirement
+ // other than needing to be rolled back before all _initialWrites.
+ for (MergedWrites::iterator it = _mergedWrites.begin();
+ it != _mergedWrites.end();
+ ++it) {
+ _preimageBuffer.copy(it->addr, it->len, it->offset);
+ }
- for (int i = _writes.size() - 1; i >= writesRollbackTo; i--) {
- // TODO need to add these pages to our "dirty count" somehow.
- _preimageBuffer.copy(_writes[i].addr, _writes[i].len, _writes[i].offset);
+ // Then roll back the initial writes in LIFO order, as these might have overlaps.
+ for (InitialWrites::reverse_iterator rit = _initialWrites.rbegin();
+ rit != _initialWrites.rend();
+ ++rit) {
+ _preimageBuffer.copy(rit->addr, rit->len, rit->offset);
}
}
- LOG(2) << " ***** ROLLING BACK " << (_changes.size() - changesRollbackTo)
- << " custom changes";
- for (int i = _changes.size() - 1; i >= changesRollbackTo; i--) {
+ LOG(2) << " ***** ROLLING BACK " << (_changes.size()) << " custom changes";
+
+ for (int i = _changes.size() - 1; i >= 0; i--) {
LOG(2) << "CUSTOM ROLLBACK " << demangleName(typeid(*_changes[i]));
_changes[i]->rollback();
}
- _writes.erase(_writes.begin() + writesRollbackTo, _writes.end());
- _changes.erase(_changes.begin() + changesRollbackTo, _changes.end());
-
- if (inOutermostUnitOfWork()) {
- // We just rolled back so we are now "clean" and don't need to roll back anymore.
- invariant(_changes.empty());
- invariant(_writes.empty());
- _preimageBuffer.clear();
- _mustRollback = false;
- }
- else {
- // Inner UOW rolled back, so outer must not commit. We can loosen this in the future,
- // but that would require all StorageEngines to support rollback of nested transactions.
- _mustRollback = true;
- }
+ resetChanges();
+ _mustRollback = false;
}
bool DurRecoveryUnit::awaitCommit() {
@@ -196,26 +237,115 @@ namespace mongo {
return getDur().awaitCommit();
}
- void* DurRecoveryUnit::writingPtr(void* data, size_t len) {
+ void DurRecoveryUnit::mergingWritingPtr(char* addr, size_t len) {
+ // The invariant is that all writes are non-overlapping and non-empty. So, a single
+ // writingPtr call may result in a number of new segments added. At this point, we cannot
+ // in general merge adjacent writes, as that would require inefficient operations on the
+ // preimage buffer.
+
+ MergedWrites::iterator coveringWrite = _mergedWrites.upper_bound(Write(addr, 0, 0));
+
+ char* const end = addr + len;
+ while (addr < end) {
+ dassert(coveringWrite == _mergedWrites.end() || coveringWrite->end() > addr);
+
+ // Determine whether addr[0] is already covered by a write or not.
+ // If covered, adjust addr and len to exclude the covered run from addr[0] onwards.
+
+ if (coveringWrite != _mergedWrites.end()) {
+ char* const cwEnd = coveringWrite->end();
+
+ if (coveringWrite->addr <= addr) {
+ // If the begin of the covering write at or before addr[0], addr[0] is covered.
+ // While the existing pre-image will not generally be the same as the data
+ // being written now, during rollback only the oldest pre-image matters.
+
+ if (end <= cwEnd) {
+ break; // fully covered
+ }
+
+ addr = cwEnd;
+ coveringWrite++;
+ dassert(coveringWrite == _mergedWrites.end() || coveringWrite->addr >= cwEnd);
+ }
+ }
+ dassert(coveringWrite == _mergedWrites.end() || coveringWrite->end() > addr);
+
+ // If the next coveringWrite overlaps, adjust the end of the uncovered region.
+ char* uncoveredEnd = end;
+ if (coveringWrite != _mergedWrites.end() && coveringWrite->addr < end) {
+ uncoveredEnd = coveringWrite->addr;
+ }
+
+ const size_t uncoveredLen = uncoveredEnd - addr;
+ if (uncoveredLen) {
+ // We are writing to a region that hasn't been declared previously.
+ _mergedWrites.insert(Write(addr, uncoveredLen, _preimageBuffer.size()));
+
+ // Windows requires us to adjust the address space *before* we write to anything.
+ privateViews.makeWritable(addr, uncoveredLen);
+
+ if (!_rollbackWritesDisabled) {
+ _preimageBuffer.append(addr, uncoveredLen);
+ }
+ addr = uncoveredEnd;
+ }
+ }
+ }
+
+ void* DurRecoveryUnit::writingPtr(void* addr, size_t len) {
invariant(inAUnitOfWork());
- if (len == 0) return data; // Don't need to do anything for empty ranges.
+ if (len == 0) {
+ return addr; // Don't need to do anything for empty ranges.
+ }
+
invariant(len < size_t(std::numeric_limits<int>::max()));
+ _writeCount++;
+ _writeBytes += len;
+ char* const data = static_cast<char*>(addr);
+
+ // The initial writes are stored in a faster, but less memory-efficient way. This will
+ // typically be enough for simple operations, where the extra cost of incremental
+ // coalescing and merging would be too much. For larger writes, more redundancy is
+ // is expected, so the cost of checking for duplicates is offset by savings in copying
+ // and allocating preimage buffers. Total memory use of the preimage buffer may be up to
+ // kMaxUnmergedPreimageBytes larger than the amount memory covered by the write intents.
+
+#ifdef _DEBUG
+ const size_t kMaxUnmergedPreimageBytes = 16*1024;
+#else
+ const size_t kMaxUnmergedPreimageBytes = 10*1024*1024;
+#endif
+
+ if (_preimageBuffer.size() + len > kMaxUnmergedPreimageBytes) {
+ mergingWritingPtr(data, len);
+
+ // After a merged write, no more initial writes can occur or there would be an
+ // ordering violation during rollback. So, ensure that the if-condition will be true
+ // for any future write regardless of length. This is true now because
+ // mergingWritingPtr also will store its first write in _preimageBuffer as well.
+ invariant(_preimageBuffer.size() >= kMaxUnmergedPreimageBytes);
+
+ return addr;
+ }
+
// Windows requires us to adjust the address space *before* we write to anything.
privateViews.makeWritable(data, len);
- _writes.push_back(Write(static_cast<char*>(data), len, _preimageBuffer.size()));
- if (!_rollbackDisabled) {
- _preimageBuffer.append(static_cast<char*>(data), len);
+ _initialWrites.push_back(Write(data, len, _preimageBuffer.size()));
+
+ if (!_rollbackWritesDisabled) {
+ _preimageBuffer.append(data, len);
}
- return data;
+ return addr;
}
void DurRecoveryUnit::setRollbackWritesDisabled() {
invariant(inOutermostUnitOfWork());
- _rollbackDisabled = true;
+ _rollbackWritesDisabled = true;
}
void DurRecoveryUnit::registerChange(Change* change) {
diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h
index 9eb76f652cf..be54f9b76b9 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h
+++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h
@@ -26,6 +26,9 @@
* it in the license file.
*/
+#include <set>
+#include <string>
+#include <utility>
#include <vector>
#include "mongo/base/owned_pointer_vector.h"
@@ -56,15 +59,32 @@ namespace mongo {
// The recovery unit takes ownership of change.
virtual void registerChange(Change* change);
- virtual void* writingPtr(void* data, size_t len);
+ virtual void* writingPtr(void* addr, size_t len);
virtual void setRollbackWritesDisabled();
virtual SnapshotId getSnapshotId() const { return SnapshotId(); }
+
private:
+ /**
+ * Marks writes for journaling, if enabled, and then commits all other Changes in order.
+ * Returns with empty _initialWrites, _mergedWrites, _changes and _preimageBuffer, but
+ * does not reset the _rollbackWritesDisabled or _mustRollback flags. This leaves the
+ * RecoveryUnit ready for more changes that may be committed or rolled back.
+ */
void commitChanges();
- void pushChangesToDurSubSystem();
- void rollbackInnermostChanges();
+
+ /**
+ * Creates a list of write intents to be journaled, and hands it of to the active
+ * DurabilityInterface.
+ */
+ void markWritesForJournaling();
+
+ /**
+ * Restores state by rolling back all writes using the saved pre-images, and then
+ * rolling back all other Changes in LIFO order. Resets internal state.
+ */
+ void rollbackChanges();
bool inAUnitOfWork() const { return !_startOfUncommittedChangesForLevel.empty(); }
@@ -72,41 +92,96 @@ namespace mongo {
return _startOfUncommittedChangesForLevel.size() == 1;
}
- bool haveUncommitedChangesAtCurrentLevel() const {
- return _writes.size() > _startOfUncommittedChangesForLevel.back().writeIndex
+ /**
+ * If true, ending a unit of work will cause rollback. Ending a (possibly nested) unit of
+ * work without committing and without making any changes will not cause rollback.
+ */
+ bool haveUncommittedChangesAtCurrentLevel() const {
+ return _writeCount > _startOfUncommittedChangesForLevel.back().writeCount
|| _changes.size() > _startOfUncommittedChangesForLevel.back().changeIndex;
}
+ /**
+ * Version of writingPtr that checks existing writes for overlap and only stores those
+ * changes not yet covered by an existing write intent and pre-image.
+ */
+ void mergingWritingPtr(char* data, size_t len);
+
+ /**
+ * Reset to a clean state without any uncommitted changes or write.
+ */
+ void resetChanges();
+
// Changes are ordered from oldest to newest.
typedef OwnedPointerVector<Change> Changes;
Changes _changes;
- // These are memory writes inside the mmapv1 mmaped files. Writes are ordered from oldest to
- // newest. Overlapping and duplicate regions are allowed, since rollback undoes changes in
- // reverse order.
- std::string _preimageBuffer;
+
+ // Number of pending uncommitted writes. Incremented even if new write is fully covered by
+ // existing writes.
+ size_t _writeCount;
+ // Total size of the pending uncommitted writes.
+ size_t _writeBytes;
+
+ /**
+ * These are memory writes inside the mmapv1 mmap-ed files. A pointer past the end is just
+ * instead of a pointer to the beginning for the benefit of MergedWrites.
+ */
struct Write {
- Write(char* addr, int len, int offset) : addr(addr), len(len), offset(offset) {}
+ Write(char* addr, int len, int offset) : addr(addr), len(len), offset(offset) { }
+ Write(const Write& rhs) : addr(rhs.addr), len(rhs.len), offset(rhs.offset) { }
+ Write() : addr(0), len(0), offset(0) { }
+ bool operator< (const Write& rhs) const { return addr < rhs.addr; }
+
+ struct compareEnd {
+ bool operator() (const Write& lhs, const Write& rhs) const {
+ return lhs.addr + lhs.len < rhs.addr + rhs.len;
+ }
+ };
- bool operator < (const Write& rhs) const { return addr < rhs.addr; }
+ char* end() const {
+ return addr + len;
+ }
char* addr;
int len;
- int offset; // index into _preimageBuffer;
+ int offset; // index into _preimageBuffer
};
- typedef std::vector<Write> Writes;
- Writes _writes;
- // Indexes of the first uncommitted Change/Write in _changes/_writes for each nesting level.
- // Index 0 in this vector is always the outermost transaction and back() is always the
- // innermost. The size() is the current nesting level.
+ /**
+ * Writes are ordered by ending address, so MergedWrites::upper_bound() can find the first
+ * overlapping write, if any. Overlapping and duplicate regions are forbidden, as rollback
+ * of MergedChanges undoes changes by address rather than LIFO order. In addition, empty
+ * regions are not allowed. Storing writes by age does not work well for large indexed
+ * arrays, as coalescing is needed to bound the size of the preimage buffer.
+ */
+ typedef std::set<Write, Write::compareEnd> MergedWrites;
+ MergedWrites _mergedWrites;
+
+ // Generally it's more efficient to just store pre-images unconditionally and then
+ // sort/eliminate duplicates at commit time. However, this can lead to excessive memory
+ // use in cases involving large indexes arrays, where the same memory is written many
+ // times. To keep the speed for the general case and bound memory use, the first few MB of
+ // pre-images are stored unconditionally, but once the threshold has been exceeded, the
+ // remainder is stored in a more space-efficient datastructure.
+ typedef std::vector<Write> InitialWrites;
+ InitialWrites _initialWrites;
+
+ std::string _preimageBuffer;
+
+ // Index of the first uncommitted Change in _changes and number of writes for each nesting
+ // level. Store the number of writes as maintained in _writeCount rather than the sum of
+ // _initialWrites and _mergedWrites, as coalescing might otherwise result in
+ // haveUncommittedChangesAtCurrent level missing merged writes when determining if rollback
+ // is necessary. Index 0 in this vector is always the outermost transaction and back() is
+ // always the innermost. The size() is the current nesting level.
struct Indexes {
- Indexes(size_t changeIndex, size_t writeIndex)
+ Indexes(size_t changeIndex, size_t writeCount)
: changeIndex(changeIndex)
- , writeIndex(writeIndex)
+ , writeCount(writeCount)
{}
size_t changeIndex;
- size_t writeIndex;
+ size_t writeCount;
};
std::vector<Indexes> _startOfUncommittedChangesForLevel;
@@ -114,10 +189,10 @@ namespace mongo {
// outermost WUOW rolls back it reverts to false.
bool _mustRollback;
- // Default is false.
+ // Default is false.
// If true, no preimages are tracked. If rollback is subsequently attempted, the process
// will abort.
- bool _rollbackDisabled;
+ bool _rollbackWritesDisabled;
};
} // namespace mongo