diff options
Diffstat (limited to 'src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp')
-rw-r--r-- | src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp | 432 |
1 files changed, 210 insertions, 222 deletions
diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp index e826277e7ff..0c9f58988e2 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp +++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp @@ -45,284 +45,272 @@ namespace mongo { - DurRecoveryUnit::DurRecoveryUnit() - : _writeCount(0), _writeBytes(0), _inUnitOfWork(false), _rollbackWritesDisabled(false) { - } +DurRecoveryUnit::DurRecoveryUnit() + : _writeCount(0), _writeBytes(0), _inUnitOfWork(false), _rollbackWritesDisabled(false) {} - void DurRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { - invariant(!_inUnitOfWork); - _inUnitOfWork = true; - } +void DurRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { + invariant(!_inUnitOfWork); + _inUnitOfWork = true; +} - void DurRecoveryUnit::commitUnitOfWork() { - invariant(_inUnitOfWork); +void DurRecoveryUnit::commitUnitOfWork() { + invariant(_inUnitOfWork); - commitChanges(); + commitChanges(); - // global journal flush opportunity - getDur().commitIfNeeded(); + // global journal flush opportunity + getDur().commitIfNeeded(); - resetChanges(); - } + resetChanges(); +} - void DurRecoveryUnit::abortUnitOfWork() { - invariant(_inUnitOfWork); +void DurRecoveryUnit::abortUnitOfWork() { + invariant(_inUnitOfWork); - rollbackChanges(); - resetChanges(); - } + rollbackChanges(); + resetChanges(); +} - void DurRecoveryUnit::abandonSnapshot() { - invariant(!_inUnitOfWork); - // no-op since we have no transaction - } +void DurRecoveryUnit::abandonSnapshot() { + invariant(!_inUnitOfWork); + // no-op since we have no transaction +} - void DurRecoveryUnit::commitChanges() { - if (getDur().isDurable()) - markWritesForJournaling(); +void DurRecoveryUnit::commitChanges() { + if (getDur().isDurable()) + markWritesForJournaling(); - try { - for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); - it != end; ++it) { - (*it)->commit(); - } - } - catch (...) { - std::terminate(); + try { + for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) { + (*it)->commit(); } + } catch (...) { + std::terminate(); } - - void DurRecoveryUnit::markWritesForJournaling() { - if (!_writeCount) - return; - - typedef std::pair<void*, unsigned> Intent; - std::vector<Intent> intents; - const size_t numStoredWrites = _initialWrites.size() + _mergedWrites.size(); - intents.reserve(numStoredWrites); - - // Show very large units of work at LOG(1) level as they may hint at performance issues - const int logLevel = (_writeCount > 100*1000 || _writeBytes > 50*1024*1024) ? 1 : 3; - - LOG(logLevel) << _writeCount << " writes (" << _writeBytes / 1024 << " kB) covered by " - << numStoredWrites << " pre-images (" - << _preimageBuffer.size() / 1024 << " kB) "; - - // orders the initial, unmerged writes, by address so we can coalesce overlapping and - // adjacent writes - std::sort(_initialWrites.begin(), _initialWrites.end()); - - if (!_initialWrites.empty()) { - intents.push_back(std::make_pair(_initialWrites.front().addr, - _initialWrites.front().len)); - for (InitialWrites::iterator it = (_initialWrites.begin() + 1), - end = _initialWrites.end(); - it != end; - ++it) { - Intent& lastIntent = intents.back(); - char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second; - if (it->addr <= lastEnd) { - // overlapping or adjacent, so extend. - ptrdiff_t extendedLen = (it->end()) - static_cast<char*>(lastIntent.first); - lastIntent.second = std::max(lastIntent.second, unsigned(extendedLen)); - } - else { - // not overlapping, so create a new intent - intents.push_back(std::make_pair(it->addr, it->len)); - } +} + +void DurRecoveryUnit::markWritesForJournaling() { + if (!_writeCount) + return; + + typedef std::pair<void*, unsigned> Intent; + std::vector<Intent> intents; + const size_t numStoredWrites = _initialWrites.size() + _mergedWrites.size(); + intents.reserve(numStoredWrites); + + // Show very large units of work at LOG(1) level as they may hint at performance issues + const int logLevel = (_writeCount > 100 * 1000 || _writeBytes > 50 * 1024 * 1024) ? 1 : 3; + + LOG(logLevel) << _writeCount << " writes (" << _writeBytes / 1024 << " kB) covered by " + << numStoredWrites << " pre-images (" << _preimageBuffer.size() / 1024 << " kB) "; + + // orders the initial, unmerged writes, by address so we can coalesce overlapping and + // adjacent writes + std::sort(_initialWrites.begin(), _initialWrites.end()); + + if (!_initialWrites.empty()) { + intents.push_back(std::make_pair(_initialWrites.front().addr, _initialWrites.front().len)); + for (InitialWrites::iterator it = (_initialWrites.begin() + 1), end = _initialWrites.end(); + it != end; + ++it) { + Intent& lastIntent = intents.back(); + char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second; + if (it->addr <= lastEnd) { + // overlapping or adjacent, so extend. + ptrdiff_t extendedLen = (it->end()) - static_cast<char*>(lastIntent.first); + lastIntent.second = std::max(lastIntent.second, unsigned(extendedLen)); + } else { + // not overlapping, so create a new intent + intents.push_back(std::make_pair(it->addr, it->len)); } } + } - MergedWrites::iterator it = _mergedWrites.begin(); - if (it != _mergedWrites.end()) { - intents.push_back(std::make_pair(it->addr, it->len)); - while (++it != _mergedWrites.end()) { - // Check the property that write intents are sorted and don't overlap. - invariant(it->addr >= intents.back().first); - Intent& lastIntent = intents.back(); - char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second; - if (it->addr == lastEnd) { - // adjacent, so extend. - lastIntent.second += it->len; - } - else { - // not overlapping, so create a new intent - invariant(it->addr > lastEnd); - intents.push_back(std::make_pair(it->addr, it->len)); - } + MergedWrites::iterator it = _mergedWrites.begin(); + if (it != _mergedWrites.end()) { + intents.push_back(std::make_pair(it->addr, it->len)); + while (++it != _mergedWrites.end()) { + // Check the property that write intents are sorted and don't overlap. + invariant(it->addr >= intents.back().first); + Intent& lastIntent = intents.back(); + char* lastEnd = static_cast<char*>(lastIntent.first) + lastIntent.second; + if (it->addr == lastEnd) { + // adjacent, so extend. + lastIntent.second += it->len; + } else { + // not overlapping, so create a new intent + invariant(it->addr > lastEnd); + intents.push_back(std::make_pair(it->addr, it->len)); } } - LOG(logLevel) << _mergedWrites.size() << " pre-images " << "coalesced into " - << intents.size() << " write intents"; - - getDur().declareWriteIntents(intents); - } - - void DurRecoveryUnit::resetChanges() { - _writeCount = 0; - _writeBytes = 0; - _initialWrites.clear(); - _mergedWrites.clear(); - _changes.clear(); - _preimageBuffer.clear(); - _rollbackWritesDisabled = false; - _inUnitOfWork = false; } - - void DurRecoveryUnit::rollbackChanges() { - // First rollback disk writes, then Changes. This matches behavior in other storage engines - // that either rollback a transaction or don't write a writebatch. - - if (_rollbackWritesDisabled) { - LOG(2) << " ***** NOT ROLLING BACK " << _writeCount << " disk writes"; + LOG(logLevel) << _mergedWrites.size() << " pre-images " + << "coalesced into " << intents.size() << " write intents"; + + getDur().declareWriteIntents(intents); +} + +void DurRecoveryUnit::resetChanges() { + _writeCount = 0; + _writeBytes = 0; + _initialWrites.clear(); + _mergedWrites.clear(); + _changes.clear(); + _preimageBuffer.clear(); + _rollbackWritesDisabled = false; + _inUnitOfWork = false; +} + +void DurRecoveryUnit::rollbackChanges() { + // First rollback disk writes, then Changes. This matches behavior in other storage engines + // that either rollback a transaction or don't write a writebatch. + + if (_rollbackWritesDisabled) { + LOG(2) << " ***** NOT ROLLING BACK " << _writeCount << " disk writes"; + } else { + LOG(2) << " ***** ROLLING BACK " << _writeCount << " disk writes"; + + // First roll back the merged writes. These have no overlap or ordering requirement + // other than needing to be rolled back before all _initialWrites. + for (MergedWrites::iterator it = _mergedWrites.begin(); it != _mergedWrites.end(); ++it) { + _preimageBuffer.copy(it->addr, it->len, it->offset); } - else { - LOG(2) << " ***** ROLLING BACK " << _writeCount << " disk writes"; - - // First roll back the merged writes. These have no overlap or ordering requirement - // other than needing to be rolled back before all _initialWrites. - for (MergedWrites::iterator it = _mergedWrites.begin(); - it != _mergedWrites.end(); - ++it) { - _preimageBuffer.copy(it->addr, it->len, it->offset); - } - // Then roll back the initial writes in LIFO order, as these might have overlaps. - for (InitialWrites::reverse_iterator rit = _initialWrites.rbegin(); - rit != _initialWrites.rend(); - ++rit) { - _preimageBuffer.copy(rit->addr, rit->len, rit->offset); - } + // Then roll back the initial writes in LIFO order, as these might have overlaps. + for (InitialWrites::reverse_iterator rit = _initialWrites.rbegin(); + rit != _initialWrites.rend(); + ++rit) { + _preimageBuffer.copy(rit->addr, rit->len, rit->offset); } + } - LOG(2) << " ***** ROLLING BACK " << (_changes.size()) << " custom changes"; + LOG(2) << " ***** ROLLING BACK " << (_changes.size()) << " custom changes"; - try { - for (int i = _changes.size() - 1; i >= 0; i--) { - LOG(2) << "CUSTOM ROLLBACK " << demangleName(typeid(*_changes[i])); - _changes[i]->rollback(); - } - } - catch (...) { - std::terminate(); + try { + for (int i = _changes.size() - 1; i >= 0; i--) { + LOG(2) << "CUSTOM ROLLBACK " << demangleName(typeid(*_changes[i])); + _changes[i]->rollback(); } + } catch (...) { + std::terminate(); } +} - bool DurRecoveryUnit::waitUntilDurable() { - invariant(!_inUnitOfWork); - return getDur().waitUntilDurable(); - } +bool DurRecoveryUnit::waitUntilDurable() { + invariant(!_inUnitOfWork); + return getDur().waitUntilDurable(); +} - void DurRecoveryUnit::mergingWritingPtr(char* addr, size_t len) { - // The invariant is that all writes are non-overlapping and non-empty. So, a single - // writingPtr call may result in a number of new segments added. At this point, we cannot - // in general merge adjacent writes, as that would require inefficient operations on the - // preimage buffer. +void DurRecoveryUnit::mergingWritingPtr(char* addr, size_t len) { + // The invariant is that all writes are non-overlapping and non-empty. So, a single + // writingPtr call may result in a number of new segments added. At this point, we cannot + // in general merge adjacent writes, as that would require inefficient operations on the + // preimage buffer. - MergedWrites::iterator coveringWrite = _mergedWrites.upper_bound(Write(addr, 0, 0)); + MergedWrites::iterator coveringWrite = _mergedWrites.upper_bound(Write(addr, 0, 0)); - char* const end = addr + len; - while (addr < end) { - dassert(coveringWrite == _mergedWrites.end() || coveringWrite->end() > addr); + char* const end = addr + len; + while (addr < end) { + dassert(coveringWrite == _mergedWrites.end() || coveringWrite->end() > addr); - // Determine whether addr[0] is already covered by a write or not. - // If covered, adjust addr and len to exclude the covered run from addr[0] onwards. + // Determine whether addr[0] is already covered by a write or not. + // If covered, adjust addr and len to exclude the covered run from addr[0] onwards. - if (coveringWrite != _mergedWrites.end()) { - char* const cwEnd = coveringWrite->end(); + if (coveringWrite != _mergedWrites.end()) { + char* const cwEnd = coveringWrite->end(); - if (coveringWrite->addr <= addr) { - // If the begin of the covering write at or before addr[0], addr[0] is covered. - // While the existing pre-image will not generally be the same as the data - // being written now, during rollback only the oldest pre-image matters. + if (coveringWrite->addr <= addr) { + // If the begin of the covering write at or before addr[0], addr[0] is covered. + // While the existing pre-image will not generally be the same as the data + // being written now, during rollback only the oldest pre-image matters. - if (end <= cwEnd) { - break; // fully covered - } - - addr = cwEnd; - coveringWrite++; - dassert(coveringWrite == _mergedWrites.end() || coveringWrite->addr >= cwEnd); + if (end <= cwEnd) { + break; // fully covered } - } - dassert(coveringWrite == _mergedWrites.end() || coveringWrite->end() > addr); - // If the next coveringWrite overlaps, adjust the end of the uncovered region. - char* uncoveredEnd = end; - if (coveringWrite != _mergedWrites.end() && coveringWrite->addr < end) { - uncoveredEnd = coveringWrite->addr; + addr = cwEnd; + coveringWrite++; + dassert(coveringWrite == _mergedWrites.end() || coveringWrite->addr >= cwEnd); } + } + dassert(coveringWrite == _mergedWrites.end() || coveringWrite->end() > addr); - const size_t uncoveredLen = uncoveredEnd - addr; - if (uncoveredLen) { - // We are writing to a region that hasn't been declared previously. - _mergedWrites.insert(Write(addr, uncoveredLen, _preimageBuffer.size())); + // If the next coveringWrite overlaps, adjust the end of the uncovered region. + char* uncoveredEnd = end; + if (coveringWrite != _mergedWrites.end() && coveringWrite->addr < end) { + uncoveredEnd = coveringWrite->addr; + } - // Windows requires us to adjust the address space *before* we write to anything. - privateViews.makeWritable(addr, uncoveredLen); + const size_t uncoveredLen = uncoveredEnd - addr; + if (uncoveredLen) { + // We are writing to a region that hasn't been declared previously. + _mergedWrites.insert(Write(addr, uncoveredLen, _preimageBuffer.size())); - if (!_rollbackWritesDisabled) { - _preimageBuffer.append(addr, uncoveredLen); - } - addr = uncoveredEnd; + // Windows requires us to adjust the address space *before* we write to anything. + privateViews.makeWritable(addr, uncoveredLen); + + if (!_rollbackWritesDisabled) { + _preimageBuffer.append(addr, uncoveredLen); } + addr = uncoveredEnd; } } +} - void* DurRecoveryUnit::writingPtr(void* addr, size_t len) { - invariant(_inUnitOfWork); - - if (len == 0) { - return addr; // Don't need to do anything for empty ranges. - } +void* DurRecoveryUnit::writingPtr(void* addr, size_t len) { + invariant(_inUnitOfWork); - invariant(len < size_t(std::numeric_limits<int>::max())); + if (len == 0) { + return addr; // Don't need to do anything for empty ranges. + } - _writeCount++; - _writeBytes += len; - char* const data = static_cast<char*>(addr); + invariant(len < size_t(std::numeric_limits<int>::max())); - // The initial writes are stored in a faster, but less memory-efficient way. This will - // typically be enough for simple operations, where the extra cost of incremental - // coalescing and merging would be too much. For larger writes, more redundancy is - // is expected, so the cost of checking for duplicates is offset by savings in copying - // and allocating preimage buffers. Total memory use of the preimage buffer may be up to - // kMaxUnmergedPreimageBytes larger than the amount memory covered by the write intents. + _writeCount++; + _writeBytes += len; + char* const data = static_cast<char*>(addr); - const size_t kMaxUnmergedPreimageBytes = kDebugBuild ? 16*1024 : 10*1024*1024; + // The initial writes are stored in a faster, but less memory-efficient way. This will + // typically be enough for simple operations, where the extra cost of incremental + // coalescing and merging would be too much. For larger writes, more redundancy is + // is expected, so the cost of checking for duplicates is offset by savings in copying + // and allocating preimage buffers. Total memory use of the preimage buffer may be up to + // kMaxUnmergedPreimageBytes larger than the amount memory covered by the write intents. - if (_preimageBuffer.size() + len > kMaxUnmergedPreimageBytes) { - mergingWritingPtr(data, len); + const size_t kMaxUnmergedPreimageBytes = kDebugBuild ? 16 * 1024 : 10 * 1024 * 1024; - // After a merged write, no more initial writes can occur or there would be an - // ordering violation during rollback. So, ensure that the if-condition will be true - // for any future write regardless of length. This is true now because - // mergingWritingPtr also will store its first write in _preimageBuffer as well. - invariant(_preimageBuffer.size() >= kMaxUnmergedPreimageBytes); + if (_preimageBuffer.size() + len > kMaxUnmergedPreimageBytes) { + mergingWritingPtr(data, len); - return addr; - } + // After a merged write, no more initial writes can occur or there would be an + // ordering violation during rollback. So, ensure that the if-condition will be true + // for any future write regardless of length. This is true now because + // mergingWritingPtr also will store its first write in _preimageBuffer as well. + invariant(_preimageBuffer.size() >= kMaxUnmergedPreimageBytes); - // Windows requires us to adjust the address space *before* we write to anything. - privateViews.makeWritable(data, len); + return addr; + } - _initialWrites.push_back(Write(data, len, _preimageBuffer.size())); + // Windows requires us to adjust the address space *before* we write to anything. + privateViews.makeWritable(data, len); - if (!_rollbackWritesDisabled) { - _preimageBuffer.append(data, len); - } + _initialWrites.push_back(Write(data, len, _preimageBuffer.size())); - return addr; + if (!_rollbackWritesDisabled) { + _preimageBuffer.append(data, len); } - void DurRecoveryUnit::setRollbackWritesDisabled() { - invariant(_inUnitOfWork); - _rollbackWritesDisabled = true; - } + return addr; +} - void DurRecoveryUnit::registerChange(Change* change) { - invariant(_inUnitOfWork); - _changes.push_back(change); - } +void DurRecoveryUnit::setRollbackWritesDisabled() { + invariant(_inUnitOfWork); + _rollbackWritesDisabled = true; +} + +void DurRecoveryUnit::registerChange(Change* change) { + invariant(_inUnitOfWork); + _changes.push_back(change); +} } // namespace mongo |