/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
#include "mongo/platform/basic.h"
#include "mongo/db/storage/mmap_v1/dur_recovery_unit.h"
#include
#include
#include "mongo/db/storage/mmap_v1/dur.h"
#include "mongo/db/storage/mmap_v1/durable_mapped_file.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
namespace mongo {
DurRecoveryUnit::DurRecoveryUnit() : _mustRollback(false) {
}
void DurRecoveryUnit::beginUnitOfWork() {
_startOfUncommittedChangesForLevel.push_back(Indexes(_changes.size(), _writes.size()));
}
void DurRecoveryUnit::commitUnitOfWork() {
invariant(inAUnitOfWork());
invariant(!_mustRollback);
if (!inOutermostUnitOfWork()) {
// If we are nested, make all changes for this level part of the containing UnitOfWork.
// They will be added to the global damages list once the outermost UnitOfWork commits,
// which it must now do.
if (haveUncommitedChangesAtCurrentLevel()) {
_startOfUncommittedChangesForLevel.back() =
Indexes(_changes.size(), _writes.size());
}
return;
}
commitChanges();
// global journal flush opportunity
getDur().commitIfNeeded();
}
void DurRecoveryUnit::endUnitOfWork() {
invariant(inAUnitOfWork());
if (haveUncommitedChangesAtCurrentLevel()) {
rollbackInnermostChanges();
}
_startOfUncommittedChangesForLevel.pop_back();
}
void DurRecoveryUnit::commitAndRestart() {
invariant( !inAUnitOfWork() );
// no-op since we have no transaction
}
void DurRecoveryUnit::commitChanges() {
if (!inAUnitOfWork())
return;
invariant(!_mustRollback);
invariant(inOutermostUnitOfWork());
invariant(_startOfUncommittedChangesForLevel.front().changeIndex == 0);
invariant(_startOfUncommittedChangesForLevel.front().writeIndex == 0);
if (getDur().isDurable())
pushChangesToDurSubSystem();
for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
(*it)->commit();
}
// We now reset to a "clean" state without any uncommited changes.
_changes.clear();
_writes.clear();
_preimageBuffer.clear();
}
void DurRecoveryUnit::pushChangesToDurSubSystem() {
if (_writes.empty())
return;
typedef std::pair Intent;
std::vector intents;
intents.reserve(_writes.size());
// orders by addr so we can coalesce overlapping and adjacent writes
std::sort(_writes.begin(), _writes.end());
intents.push_back(std::make_pair(_writes.front().addr, _writes.front().len));
for (Writes::iterator it = (_writes.begin() + 1), end = _writes.end(); it != end; ++it) {
Intent& lastIntent = intents.back();
char* lastEnd = static_cast(lastIntent.first) + lastIntent.second;
if (it->addr <= lastEnd) {
// overlapping or adjacent, so extend.
ptrdiff_t extendedLen = (it->addr + it->len) - static_cast(lastIntent.first);
lastIntent.second = std::max(lastIntent.second, unsigned(extendedLen));
}
else {
// not overlapping, so create a new intent
intents.push_back(std::make_pair(it->addr, it->len));
}
}
getDur().declareWriteIntents(intents);
}
void DurRecoveryUnit::rollbackInnermostChanges() {
// Using signed ints to avoid issues in loops below around index 0.
invariant(_changes.size() <= size_t(std::numeric_limits::max()));
invariant(_writes.size() <= size_t(std::numeric_limits::max()));
const int changesRollbackTo = _startOfUncommittedChangesForLevel.back().changeIndex;
const int writesRollbackTo = _startOfUncommittedChangesForLevel.back().writeIndex;
LOG(2) << " ***** ROLLING BACK " << (_writes.size() - writesRollbackTo) << " disk writes"
<< " and " << (_changes.size() - changesRollbackTo) << " custom changes";
// First rollback disk writes, then Changes. This matches behavior in other storage engines
// that either rollback a transaction or don't write a writebatch.
for (int i = _writes.size() - 1; i >= writesRollbackTo; i--) {
// TODO need to add these pages to our "dirty count" somehow.
_preimageBuffer.copy(_writes[i].addr, _writes[i].len, _writes[i].offset);
}
for (int i = _changes.size() - 1; i >= changesRollbackTo; i--) {
LOG(2) << "CUSTOM ROLLBACK " << demangleName(typeid(*_changes[i]));
_changes[i]->rollback();
}
_writes.erase(_writes.begin() + writesRollbackTo, _writes.end());
_changes.erase(_changes.begin() + changesRollbackTo, _changes.end());
if (inOutermostUnitOfWork()) {
// We just rolled back so we are now "clean" and don't need to roll back anymore.
invariant(_changes.empty());
invariant(_writes.empty());
_preimageBuffer.clear();
_mustRollback = false;
}
else {
// Inner UOW rolled back, so outer must not commit. We can loosen this in the future,
// but that would require all StorageEngines to support rollback of nested transactions.
_mustRollback = true;
}
}
bool DurRecoveryUnit::awaitCommit() {
invariant(!inAUnitOfWork());
return getDur().awaitCommit();
}
void* DurRecoveryUnit::writingPtr(void* data, size_t len) {
invariant(inAUnitOfWork());
if (len == 0) return data; // Don't need to do anything for empty ranges.
invariant(len < size_t(std::numeric_limits::max()));
// Windows requires us to adjust the address space *before* we write to anything.
privateViews.makeWritable(data, len);
_writes.push_back(Write(static_cast(data), len, _preimageBuffer.size()));
_preimageBuffer.append(static_cast(data), len);
return data;
}
void DurRecoveryUnit::registerChange(Change* change) {
invariant(inAUnitOfWork());
_changes.push_back(change);
}
} // namespace mongo