/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/replication_recovery.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/sync_tail.h"
#include "mongo/util/log.h"
namespace mongo {
namespace repl {
ReplicationRecoveryImpl::ReplicationRecoveryImpl(StorageInterface* storageInterface,
ReplicationConsistencyMarkers* consistencyMarkers)
: _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers) {}
void ReplicationRecoveryImpl::recoverFromOplog(OperationContext* opCtx,
boost::optional stableTimestamp) try {
if (_consistencyMarkers->getInitialSyncFlag(opCtx)) {
log() << "No recovery needed. Initial sync flag set.";
return; // Initial Sync will take over so no cleanup is needed.
}
const auto truncateAfterPoint = _consistencyMarkers->getOplogTruncateAfterPoint(opCtx);
if (!truncateAfterPoint.isNull()) {
log() << "Removing unapplied entries starting at: " << truncateAfterPoint.toBSON();
_truncateOplogTo(opCtx, truncateAfterPoint);
// Clear the truncateAfterPoint so that we don't truncate the next batch of oplog entries
// erroneously.
_consistencyMarkers->setOplogTruncateAfterPoint(opCtx, {});
}
auto topOfOplogSW = _getTopOfOplog(opCtx);
if (topOfOplogSW.getStatus() == ErrorCodes::CollectionIsEmpty ||
topOfOplogSW.getStatus() == ErrorCodes::NamespaceNotFound) {
// Oplog is empty. There are no oplog entries to apply, so we exit recovery and go into
// initial sync.
log() << "No oplog entries to apply for recovery. Oplog is empty. Entering initial sync.";
return;
}
fassert(40290, topOfOplogSW);
const auto topOfOplog = topOfOplogSW.getValue();
const auto appliedThrough = _consistencyMarkers->getAppliedThrough(opCtx);
invariant(!stableTimestamp || appliedThrough.isNull() ||
*stableTimestamp == appliedThrough.getTimestamp(),
str::stream() << "Stable timestamp " << stableTimestamp->toString()
<< " does not equal appliedThrough timestamp "
<< appliedThrough.toString());
// If we were passed in a stable timestamp, we are in rollback recovery and should recover from
// that stable timestamp. Otherwise, we're recovering at startup. If this storage engine
// supports recover to stable timestamp, we ask it for the recovery timestamp. If the storage
// engine returns a timestamp, we recover from that point. However, if the storage engine
// returns "none", the storage engine does not have a stable checkpoint and we must recover from
// an unstable checkpoint instead.
const bool supportsRecoverToStableTimestamp =
_storageInterface->supportsRecoverToStableTimestamp(opCtx->getServiceContext());
if (!stableTimestamp && supportsRecoverToStableTimestamp) {
stableTimestamp = _storageInterface->getRecoveryTimestamp(opCtx->getServiceContext());
}
if (stableTimestamp) {
invariant(supportsRecoverToStableTimestamp);
_recoverFromStableTimestamp(opCtx, *stableTimestamp, appliedThrough, topOfOplog);
} else {
_recoverFromUnstableCheckpoint(opCtx, appliedThrough, topOfOplog);
}
} catch (...) {
severe() << "Caught exception during replication recovery: " << exceptionToStatus();
std::terminate();
}
void ReplicationRecoveryImpl::_recoverFromStableTimestamp(OperationContext* opCtx,
Timestamp stableTimestamp,
OpTime appliedThrough,
OpTime topOfOplog) {
invariant(!stableTimestamp.isNull());
invariant(!topOfOplog.isNull());
log() << "Recovering from stable timestamp: " << stableTimestamp
<< " (top of oplog: " << topOfOplog << ", appliedThrough: " << appliedThrough << ")";
log() << "Starting recovery oplog application at the stable timestamp: " << stableTimestamp;
_applyToEndOfOplog(opCtx, stableTimestamp, topOfOplog.getTimestamp());
}
void ReplicationRecoveryImpl::_recoverFromUnstableCheckpoint(OperationContext* opCtx,
OpTime appliedThrough,
OpTime topOfOplog) {
invariant(!topOfOplog.isNull());
log() << "Recovering from an unstable checkpoint (top of oplog: " << topOfOplog
<< ", appliedThrough: " << appliedThrough << ")";
if (appliedThrough.isNull()) {
// The appliedThrough would be null if we shut down cleanly or crashed as a primary. Either
// way we are consistent at the top of the oplog.
log() << "No oplog entries to apply for recovery. appliedThrough is null.";
} else {
// If the appliedThrough is not null, then we shut down uncleanly during secondary oplog
// application and must apply from the appliedThrough to the top of the oplog.
log() << "Starting recovery oplog application at the appliedThrough: " << appliedThrough
<< ", through the top of the oplog: " << topOfOplog;
_applyToEndOfOplog(opCtx, appliedThrough.getTimestamp(), topOfOplog.getTimestamp());
}
}
void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
Timestamp oplogApplicationStartPoint,
Timestamp topOfOplog) {
invariant(!oplogApplicationStartPoint.isNull());
invariant(!topOfOplog.isNull());
// Check if we have any unapplied ops in our oplog. It is important that this is done after
// deleting the ragged end of the oplog.
if (oplogApplicationStartPoint == topOfOplog) {
log() << "No oplog entries to apply for recovery. Start point is at the top of the oplog.";
return; // We've applied all the valid oplog we have.
} else if (oplogApplicationStartPoint > topOfOplog) {
severe() << "Applied op " << oplogApplicationStartPoint.toBSON()
<< " not found. Top of oplog is " << topOfOplog.toBSON() << '.';
fassertFailedNoTrace(40313);
}
log() << "Replaying stored operations from " << oplogApplicationStartPoint.toBSON()
<< " (exclusive) to " << topOfOplog.toBSON() << " (inclusive).";
DBDirectClient db(opCtx);
auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(),
QUERY("ts" << BSON("$gte" << oplogApplicationStartPoint)),
/*batchSize*/ 0,
/*skip*/ 0,
/*projection*/ nullptr,
QueryOption_OplogReplay);
// Check that the first document matches our appliedThrough point then skip it since it's
// already been applied.
if (!cursor->more()) {
// This should really be impossible because we check above that the top of the oplog is
// strictly > appliedThrough. If this fails it represents a serious bug in either the
// storage engine or query's implementation of OplogReplay.
severe() << "Couldn't find any entries in the oplog >= "
<< oplogApplicationStartPoint.toBSON() << " which should be impossible.";
fassertFailedNoTrace(40293);
}
auto firstTimestampFound =
fassert(40291, OpTime::parseFromOplogEntry(cursor->nextSafe())).getTimestamp();
if (firstTimestampFound != oplogApplicationStartPoint) {
severe() << "Oplog entry at " << oplogApplicationStartPoint.toBSON()
<< " is missing; actual entry found is " << firstTimestampFound.toBSON();
fassertFailedNoTrace(40292);
}
// Apply remaining ops one at at time, but don't log them because they are already logged.
UnreplicatedWritesBlock uwb(opCtx);
BSONObj entry;
while (cursor->more()) {
entry = cursor->nextSafe();
fassert(40294, SyncTail::syncApply(opCtx, entry, OplogApplication::Mode::kRecovering));
}
// We may crash before setting appliedThrough. If we have a stable checkpoint, we will recover
// to that checkpoint at a replication consistent point, and applying the oplog is safe.
// If we don't have a stable checkpoint, then we must be in startup recovery, and not rollback
// recovery, because we only roll back to a stable timestamp when we have a stable checkpoint.
// Startup recovery from an unstable checkpoint only ever applies a single batch and it is safe
// to replay the batch from any point.
_consistencyMarkers->setAppliedThrough(opCtx,
fassert(40295, OpTime::parseFromOplogEntry(entry)));
}
StatusWith ReplicationRecoveryImpl::_getTopOfOplog(OperationContext* opCtx) const {
const auto docsSW = _storageInterface->findDocuments(opCtx,
NamespaceString::kRsOplogNamespace,
boost::none, // Collection scan
StorageInterface::ScanDirection::kBackward,
{},
BoundInclusion::kIncludeStartKeyOnly,
1U);
if (!docsSW.isOK()) {
return docsSW.getStatus();
}
const auto docs = docsSW.getValue();
if (docs.empty()) {
return Status(ErrorCodes::CollectionIsEmpty, "oplog is empty");
}
invariant(1U == docs.size());
return OpTime::parseFromOplogEntry(docs.front());
}
void ReplicationRecoveryImpl::_truncateOplogTo(OperationContext* opCtx,
Timestamp truncateTimestamp) {
const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace);
AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX);
Lock::CollectionLock oplogCollectionLoc(opCtx->lockState(), oplogNss.ns(), MODE_X);
Collection* oplogCollection = autoDb.getDb()->getCollection(opCtx, oplogNss);
if (!oplogCollection) {
fassertFailedWithStatusNoTrace(
34418,
Status(ErrorCodes::NamespaceNotFound,
str::stream() << "Can't find " << NamespaceString::kRsOplogNamespace.ns()));
}
// Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
RecordId oldestIDToDelete; // Non-null if there is something to delete.
auto oplogRs = oplogCollection->getRecordStore();
auto oplogReverseCursor = oplogRs->getCursor(opCtx, /*forward=*/false);
size_t count = 0;
while (auto next = oplogReverseCursor->next()) {
const BSONObj entry = next->data.releaseToBson();
const RecordId id = next->id;
count++;
const auto tsElem = entry["ts"];
if (count == 1) {
if (tsElem.eoo())
LOG(2) << "Oplog tail entry: " << redact(entry);
else
LOG(2) << "Oplog tail entry ts field: " << tsElem;
}
if (tsElem.timestamp() < truncateTimestamp) {
// If count == 1, that means that we have nothing to delete because everything in the
// oplog is < truncateTimestamp.
if (count != 1) {
invariant(!oldestIDToDelete.isNull());
oplogCollection->cappedTruncateAfter(opCtx, oldestIDToDelete, /*inclusive=*/true);
}
return;
}
oldestIDToDelete = id;
}
severe() << "Reached end of oplog looking for oplog entry before " << truncateTimestamp.toBSON()
<< " but couldn't find any after looking through " << count << " entries.";
fassertFailedNoTrace(40296);
}
} // namespace repl
} // namespace mongo