summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp')
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp284
1 files changed, 207 insertions, 77 deletions
diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
index 6382fcf5f83..af4d93ba039 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
@@ -33,6 +33,9 @@
#include "mongo/db/storage/mmap_v1/dur_recovery_unit.h"
#include <algorithm>
+#include <limits>
+#include <map>
+#include <set>
#include <string>
#include "mongo/db/storage/mmap_v1/dur.h"
@@ -42,12 +45,12 @@
namespace mongo {
- DurRecoveryUnit::DurRecoveryUnit() : _mustRollback(false), _rollbackDisabled(false) {
-
+ DurRecoveryUnit::DurRecoveryUnit()
+ : _writeCount(0), _writeBytes(0), _mustRollback(false), _rollbackWritesDisabled(false) {
}
void DurRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
- _startOfUncommittedChangesForLevel.push_back(Indexes(_changes.size(), _writes.size()));
+ _startOfUncommittedChangesForLevel.push_back(Indexes(_changes.size(), _writeCount));
}
void DurRecoveryUnit::commitUnitOfWork() {
@@ -58,9 +61,9 @@ namespace mongo {
// If we are nested, make all changes for this level part of the containing UnitOfWork.
// They will be added to the global damages list once the outermost UnitOfWork commits,
// which it must now do.
- if (haveUncommitedChangesAtCurrentLevel()) {
+ if (haveUncommittedChangesAtCurrentLevel()) {
_startOfUncommittedChangesForLevel.back() =
- Indexes(_changes.size(), _writes.size());
+ Indexes(_changes.size(), _writeCount);
}
return;
}
@@ -74,18 +77,17 @@ namespace mongo {
void DurRecoveryUnit::endUnitOfWork() {
invariant(inAUnitOfWork());
- if (haveUncommitedChangesAtCurrentLevel()) {
- rollbackInnermostChanges();
+ if (haveUncommittedChangesAtCurrentLevel()) {
+ _mustRollback = true;
}
// Reset back to default if this is the last unwind of the recovery unit. That way, it can
// be reused for new operations.
if (inOutermostUnitOfWork()) {
- dassert(_changes.empty());
- dassert(_writes.empty());
- _preimageBuffer.clear();
- _mustRollback = false;
- _rollbackDisabled = false;
+ rollbackChanges();
+ _rollbackWritesDisabled = false;
+ dassert(_changes.empty() && _initialWrites.empty() && _mergedWrites.empty());
+ dassert( _preimageBuffer.empty() && !_writeCount && !_writeBytes && !_mustRollback);
}
_startOfUncommittedChangesForLevel.pop_back();
@@ -97,98 +99,137 @@ namespace mongo {
}
void DurRecoveryUnit::commitChanges() {
- if (!inAUnitOfWork())
- return;
-
invariant(!_mustRollback);
invariant(inOutermostUnitOfWork());
invariant(_startOfUncommittedChangesForLevel.front().changeIndex == 0);
- invariant(_startOfUncommittedChangesForLevel.front().writeIndex == 0);
+ invariant(_startOfUncommittedChangesForLevel.front().writeCount == 0);
if (getDur().isDurable())
- pushChangesToDurSubSystem();
+ markWritesForJournaling();
- for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
+ for (Changes::const_iterator it = _changes.begin(), end = _changes.end();
+ it != end; ++it) {
(*it)->commit();
}
- // We now reset to a "clean" state without any uncommitted changes.
- _changes.clear();
- _writes.clear();
- _preimageBuffer.clear();
+ resetChanges();
+
}
- void DurRecoveryUnit::pushChangesToDurSubSystem() {
- if (_writes.empty())
+ void DurRecoveryUnit::markWritesForJournaling() {
+ if (!_writeCount)
return;
typedef std::pair<void*, unsigned> Intent;
std::vector<Intent> intents;
- intents.reserve(_writes.size());
-
- // orders by addr so we can coalesce overlapping and adjacent writes
- std::sort(_writes.begin(), _writes.end());
-
- intents.push_back(std::make_pair(_writes.front().addr, _writes.front().len));
- for (Writes::iterator it = (_writes.begin() + 1), end = _writes.end(); it != end; ++it) {
- Intent& lastIntent = intents.back();
- char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second;
- if (it->addr <= lastEnd) {
- // overlapping or adjacent, so extend.
- ptrdiff_t extendedLen = (it->addr + it->len) - static_cast<char*>(lastIntent.first);
- lastIntent.second = std::max(lastIntent.second, unsigned(extendedLen));
+ const size_t numStoredWrites = _initialWrites.size() + _mergedWrites.size();
+ intents.reserve(numStoredWrites);
+
+ // Show very large units of work at LOG(1) level as they may hint at performance issues
+ const int logLevel = (_writeCount > 100*1000 || _writeBytes > 50*1024*1024) ? 1 : 3;
+
+ LOG(logLevel) << _writeCount << " writes (" << _writeBytes / 1024 << " kB) covered by "
+ << numStoredWrites << " pre-images ("
+ << _preimageBuffer.size() / 1024 << " kB) ";
+
+ // orders the initial, unmerged writes, by address so we can coalesce overlapping and
+ // adjacent writes
+ std::sort(_initialWrites.begin(), _initialWrites.end());
+
+ if (!_initialWrites.empty()) {
+ intents.push_back(std::make_pair(_initialWrites.front().addr,
+ _initialWrites.front().len));
+ for (InitialWrites::iterator it = (_initialWrites.begin() + 1),
+ end = _initialWrites.end();
+ it != end;
+ ++it) {
+ Intent& lastIntent = intents.back();
+ char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second;
+ if (it->addr <= lastEnd) {
+ // overlapping or adjacent, so extend.
+ ptrdiff_t extendedLen = (it->end()) - static_cast<char*>(lastIntent.first);
+ lastIntent.second = std::max(lastIntent.second, unsigned(extendedLen));
+ }
+ else {
+ // not overlapping, so create a new intent
+ intents.push_back(std::make_pair(it->addr, it->len));
+ }
}
- else {
- // not overlapping, so create a new intent
- intents.push_back(std::make_pair(it->addr, it->len));
+ }
+
+ MergedWrites::iterator it = _mergedWrites.begin();
+ if (it != _mergedWrites.end()) {
+ intents.push_back(std::make_pair(it->addr, it->len));
+ while (++it != _mergedWrites.end()) {
+ // Check the property that write intents are sorted and don't overlap.
+ invariant(it->addr >= intents.back().first);
+ Intent& lastIntent = intents.back();
+ char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second;
+ if (it->addr == lastEnd) {
+ // adjacent, so extend.
+ lastIntent.second += it->len;
+ }
+ else {
+ // not overlapping, so create a new intent
+ invariant(it->addr > lastEnd);
+ intents.push_back(std::make_pair(it->addr, it->len));
+ }
}
}
+ LOG(logLevel) << _mergedWrites.size() << " pre-images " << "coalesced into "
+ << intents.size() << " write intents";
getDur().declareWriteIntents(intents);
}
- void DurRecoveryUnit::rollbackInnermostChanges() {
- // Using signed ints to avoid issues in loops below around index 0.
- invariant(_changes.size() <= size_t(std::numeric_limits<int>::max()));
- invariant(_writes.size() <= size_t(std::numeric_limits<int>::max()));
- const int changesRollbackTo = _startOfUncommittedChangesForLevel.back().changeIndex;
- const int writesRollbackTo = _startOfUncommittedChangesForLevel.back().writeIndex;
+ void DurRecoveryUnit::resetChanges() {
+ _writeCount = 0;
+ _writeBytes = 0;
+ _initialWrites.clear();
+ _mergedWrites.clear();
+ _changes.clear();
+ _preimageBuffer.clear();
+ }
+
+ void DurRecoveryUnit::rollbackChanges() {
+ invariant(inOutermostUnitOfWork());
+ invariant(!_startOfUncommittedChangesForLevel.back().changeIndex);
+ invariant(!_startOfUncommittedChangesForLevel.back().writeCount);
// First rollback disk writes, then Changes. This matches behavior in other storage engines
// that either rollback a transaction or don't write a writebatch.
- if (!_rollbackDisabled) {
- LOG(2) << " ***** ROLLING BACK " << (_writes.size() - writesRollbackTo)
- << " disk writes";
+ if (_rollbackWritesDisabled) {
+ LOG(2) << " ***** NOT ROLLING BACK " << _writeCount << " disk writes";
+ }
+ else {
+ LOG(2) << " ***** ROLLING BACK " << _writeCount << " disk writes";
+
+ // First roll back the merged writes. These have no overlap or ordering requirement
+ // other than needing to be rolled back before all _initialWrites.
+ for (MergedWrites::iterator it = _mergedWrites.begin();
+ it != _mergedWrites.end();
+ ++it) {
+ _preimageBuffer.copy(it->addr, it->len, it->offset);
+ }
- for (int i = _writes.size() - 1; i >= writesRollbackTo; i--) {
- // TODO need to add these pages to our "dirty count" somehow.
- _preimageBuffer.copy(_writes[i].addr, _writes[i].len, _writes[i].offset);
+ // Then roll back the initial writes in LIFO order, as these might have overlaps.
+ for (InitialWrites::reverse_iterator rit = _initialWrites.rbegin();
+ rit != _initialWrites.rend();
+ ++rit) {
+ _preimageBuffer.copy(rit->addr, rit->len, rit->offset);
}
}
- LOG(2) << " ***** ROLLING BACK " << (_changes.size() - changesRollbackTo)
- << " custom changes";
- for (int i = _changes.size() - 1; i >= changesRollbackTo; i--) {
+ LOG(2) << " ***** ROLLING BACK " << (_changes.size()) << " custom changes";
+
+ for (int i = _changes.size() - 1; i >= 0; i--) {
LOG(2) << "CUSTOM ROLLBACK " << demangleName(typeid(*_changes[i]));
_changes[i]->rollback();
}
- _writes.erase(_writes.begin() + writesRollbackTo, _writes.end());
- _changes.erase(_changes.begin() + changesRollbackTo, _changes.end());
-
- if (inOutermostUnitOfWork()) {
- // We just rolled back so we are now "clean" and don't need to roll back anymore.
- invariant(_changes.empty());
- invariant(_writes.empty());
- _preimageBuffer.clear();
- _mustRollback = false;
- }
- else {
- // Inner UOW rolled back, so outer must not commit. We can loosen this in the future,
- // but that would require all StorageEngines to support rollback of nested transactions.
- _mustRollback = true;
- }
+ resetChanges();
+ _mustRollback = false;
}
bool DurRecoveryUnit::awaitCommit() {
@@ -196,26 +237,115 @@ namespace mongo {
return getDur().awaitCommit();
}
- void* DurRecoveryUnit::writingPtr(void* data, size_t len) {
+ void DurRecoveryUnit::mergingWritingPtr(char* addr, size_t len) {
+ // The invariant is that all writes are non-overlapping and non-empty. So, a single
+ // writingPtr call may result in a number of new segments added. At this point, we cannot
+ // in general merge adjacent writes, as that would require inefficient operations on the
+ // preimage buffer.
+
+ MergedWrites::iterator coveringWrite = _mergedWrites.upper_bound(Write(addr, 0, 0));
+
+ char* const end = addr + len;
+ while (addr < end) {
+ dassert(coveringWrite == _mergedWrites.end() || coveringWrite->end() > addr);
+
+ // Determine whether addr[0] is already covered by a write or not.
+ // If covered, adjust addr and len to exclude the covered run from addr[0] onwards.
+
+ if (coveringWrite != _mergedWrites.end()) {
+ char* const cwEnd = coveringWrite->end();
+
+ if (coveringWrite->addr <= addr) {
+ // If the begin of the covering write at or before addr[0], addr[0] is covered.
+ // While the existing pre-image will not generally be the same as the data
+ // being written now, during rollback only the oldest pre-image matters.
+
+ if (end <= cwEnd) {
+ break; // fully covered
+ }
+
+ addr = cwEnd;
+ coveringWrite++;
+ dassert(coveringWrite == _mergedWrites.end() || coveringWrite->addr >= cwEnd);
+ }
+ }
+ dassert(coveringWrite == _mergedWrites.end() || coveringWrite->end() > addr);
+
+ // If the next coveringWrite overlaps, adjust the end of the uncovered region.
+ char* uncoveredEnd = end;
+ if (coveringWrite != _mergedWrites.end() && coveringWrite->addr < end) {
+ uncoveredEnd = coveringWrite->addr;
+ }
+
+ const size_t uncoveredLen = uncoveredEnd - addr;
+ if (uncoveredLen) {
+ // We are writing to a region that hasn't been declared previously.
+ _mergedWrites.insert(Write(addr, uncoveredLen, _preimageBuffer.size()));
+
+ // Windows requires us to adjust the address space *before* we write to anything.
+ privateViews.makeWritable(addr, uncoveredLen);
+
+ if (!_rollbackWritesDisabled) {
+ _preimageBuffer.append(addr, uncoveredLen);
+ }
+ addr = uncoveredEnd;
+ }
+ }
+ }
+
+ void* DurRecoveryUnit::writingPtr(void* addr, size_t len) {
invariant(inAUnitOfWork());
- if (len == 0) return data; // Don't need to do anything for empty ranges.
+ if (len == 0) {
+ return addr; // Don't need to do anything for empty ranges.
+ }
+
invariant(len < size_t(std::numeric_limits<int>::max()));
+ _writeCount++;
+ _writeBytes += len;
+ char* const data = static_cast<char*>(addr);
+
+ // The initial writes are stored in a faster, but less memory-efficient way. This will
+ // typically be enough for simple operations, where the extra cost of incremental
+ // coalescing and merging would be too much. For larger writes, more redundancy is
+ // is expected, so the cost of checking for duplicates is offset by savings in copying
+ // and allocating preimage buffers. Total memory use of the preimage buffer may be up to
+ // kMaxUnmergedPreimageBytes larger than the amount memory covered by the write intents.
+
+#ifdef _DEBUG
+ const size_t kMaxUnmergedPreimageBytes = 16*1024;
+#else
+ const size_t kMaxUnmergedPreimageBytes = 10*1024*1024;
+#endif
+
+ if (_preimageBuffer.size() + len > kMaxUnmergedPreimageBytes) {
+ mergingWritingPtr(data, len);
+
+ // After a merged write, no more initial writes can occur or there would be an
+ // ordering violation during rollback. So, ensure that the if-condition will be true
+ // for any future write regardless of length. This is true now because
+ // mergingWritingPtr also will store its first write in _preimageBuffer as well.
+ invariant(_preimageBuffer.size() >= kMaxUnmergedPreimageBytes);
+
+ return addr;
+ }
+
// Windows requires us to adjust the address space *before* we write to anything.
privateViews.makeWritable(data, len);
- _writes.push_back(Write(static_cast<char*>(data), len, _preimageBuffer.size()));
- if (!_rollbackDisabled) {
- _preimageBuffer.append(static_cast<char*>(data), len);
+ _initialWrites.push_back(Write(data, len, _preimageBuffer.size()));
+
+ if (!_rollbackWritesDisabled) {
+ _preimageBuffer.append(data, len);
}
- return data;
+ return addr;
}
void DurRecoveryUnit::setRollbackWritesDisabled() {
invariant(inOutermostUnitOfWork());
- _rollbackDisabled = true;
+ _rollbackWritesDisabled = true;
}
void DurRecoveryUnit::registerChange(Change* change) {