/**
* Copyright (C) 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/base/counter.h"
#include "mongo/base/string_data.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connection_pool.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/repl/data_replicator_external_state_impl.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/oplog_interface_remote.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/rollback_source_impl.h"
#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
#include "mongo/db/server_parameters.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/time_support.h"
namespace mongo {
using std::string;
namespace repl {
namespace {
const char kHashFieldName[] = "h";
const int kSleepToAllowBatchingMillis = 2;
const int kSmallBatchLimitBytes = 40000;
const Milliseconds kRollbackOplogSocketTimeout(10 * 60 * 1000);
// 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use.
const auto defaultBatchSize = (16 * 1024 * 1024) / 12 * 10;
// The batchSize to use for the find/getMore queries called by the OplogFetcher
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(bgSyncOplogFetcherBatchSize, int, defaultBatchSize);
// The batchSize to use for the find/getMore queries called by the rollback common point resolver.
// A batchSize of 0 means that the 'find' and 'getMore' commands will be given no batchSize.
// We set the default to 2000 to prevent the sync source from having to read too much data at once,
// and reduce the chance of a socket timeout.
// We choose 2000 for (10 minute timeout) * (60 sec / min) * (50 MB / second) / (16 MB / document).
constexpr int defaultRollbackBatchSize = 2000;
MONGO_EXPORT_SERVER_PARAMETER(rollbackRemoteOplogQueryBatchSize, int, defaultRollbackBatchSize)
->withValidator([](const auto& potentialNewValue) {
if (potentialNewValue < 0) {
return Status(ErrorCodes::BadValue,
"rollbackRemoteOplogQueryBatchSize cannot be negative.");
}
return Status::OK();
});
// If 'forceRollbackViaRefetch' is true, always perform rollbacks via the refetch algorithm, even if
// the storage engine supports rollback via recover to timestamp.
constexpr bool forceRollbackViaRefetchByDefault = false;
MONGO_EXPORT_SERVER_PARAMETER(forceRollbackViaRefetch, bool, forceRollbackViaRefetchByDefault);
/**
* Extends DataReplicatorExternalStateImpl to be member state aware.
*/
class DataReplicatorExternalStateBackgroundSync : public DataReplicatorExternalStateImpl {
public:
DataReplicatorExternalStateBackgroundSync(
ReplicationCoordinator* replicationCoordinator,
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
BackgroundSync* bgsync);
bool shouldStopFetching(const HostAndPort& source,
const rpc::ReplSetMetadata& replMetadata,
boost::optional oqMetadata) override;
private:
BackgroundSync* _bgsync;
};
DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackgroundSync(
ReplicationCoordinator* replicationCoordinator,
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
BackgroundSync* bgsync)
: DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState),
_bgsync(bgsync) {}
bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(
const HostAndPort& source,
const rpc::ReplSetMetadata& replMetadata,
boost::optional oqMetadata) {
if (_bgsync->shouldStopFetching()) {
return true;
}
return DataReplicatorExternalStateImpl::shouldStopFetching(source, replMetadata, oqMetadata);
}
size_t getSize(const BSONObj& o) {
// SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion
return static_cast(o.objsize());
}
} // namespace
// Failpoint which causes rollback to hang before starting.
MONGO_FAIL_POINT_DEFINE(rollbackHangBeforeStart);
BackgroundSync::BackgroundSync(
ReplicationCoordinator* replicationCoordinator,
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
ReplicationProcess* replicationProcess,
OplogApplier* oplogApplier)
: _oplogApplier(oplogApplier),
_replCoord(replicationCoordinator),
_replicationCoordinatorExternalState(replicationCoordinatorExternalState),
_replicationProcess(replicationProcess) {}
void BackgroundSync::startup(OperationContext* opCtx) {
invariant(!_producerThread);
_producerThread.reset(new stdx::thread([this] { _run(); }));
}
void BackgroundSync::shutdown(OperationContext* opCtx) {
stdx::lock_guard lock(_mutex);
_state = ProducerState::Stopped;
if (_syncSourceResolver) {
_syncSourceResolver->shutdown();
}
if (_oplogFetcher) {
_oplogFetcher->shutdown();
}
if (_rollback) {
_rollback->shutdown();
}
_inShutdown = true;
}
void BackgroundSync::join(OperationContext* opCtx) {
_producerThread->join();
}
bool BackgroundSync::inShutdown() const {
stdx::lock_guard lock(_mutex);
return _inShutdown_inlock();
}
bool BackgroundSync::_inShutdown_inlock() const {
return _inShutdown;
}
void BackgroundSync::_run() {
Client::initThread("rsBackgroundSync");
AuthorizationSession::get(cc())->grantInternalAuthorization();
while (!inShutdown()) {
try {
_runProducer();
} catch (const DBException& e) {
std::string msg(str::stream() << "sync producer problem: " << redact(e));
error() << msg;
_replCoord->setMyHeartbeatMessage(msg);
sleepmillis(100); // sleep a bit to keep from hammering this thread with temp. errors.
} catch (const std::exception& e2) {
// redact(std::exception&) doesn't work
severe() << "sync producer exception: " << redact(e2.what());
fassertFailed(28546);
}
}
// No need to reset optimes here because we are shutting down.
stop(false);
}
void BackgroundSync::_runProducer() {
if (getState() == ProducerState::Stopped) {
sleepsecs(1);
return;
}
auto memberState = _replCoord->getMemberState();
invariant(!memberState.rollback());
invariant(!memberState.startup());
// We need to wait until initial sync has started.
if (_replCoord->getMyLastAppliedOpTime().isNull()) {
sleepsecs(1);
return;
}
// we want to start when we're no longer primary
// start() also loads _lastOpTimeFetched, which we know is set from the "if"
{
auto opCtx = cc().makeOperationContext();
if (getState() == ProducerState::Starting) {
start(opCtx.get());
}
}
_produce();
}
void BackgroundSync::_produce() {
if (MONGO_FAIL_POINT(stopReplProducer)) {
// This log output is used in js tests so please leave it.
log() << "bgsync - stopReplProducer fail point "
"enabled. Blocking until fail point is disabled.";
// TODO(SERVER-27120): Remove the return statement and uncomment the while loop.
// Currently we cannot block here or we prevent primaries from being fully elected since
// we'll never call _signalNoNewDataForApplier.
// while (MONGO_FAIL_POINT(stopReplProducer) && !inShutdown()) {
// mongo::sleepsecs(1);
// }
mongo::sleepsecs(1);
return;
}
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
HostAndPort oldSource;
OpTime lastOpTimeFetched;
HostAndPort source;
SyncSourceResolverResponse syncSourceResp;
{
stdx::unique_lock lock(_mutex);
if (_lastOpTimeFetched.isNull()) {
// then we're initial syncing and we're still waiting for this to be set
lock.unlock();
sleepsecs(1);
// if there is no one to sync from
return;
}
if (_state != ProducerState::Running) {
return;
}
oldSource = _syncSourceHost;
}
// find a target to sync from the last optime fetched
{
OpTime minValidSaved;
{
auto opCtx = cc().makeOperationContext();
minValidSaved = _replicationProcess->getConsistencyMarkers()->getMinValid(opCtx.get());
}
stdx::lock_guard lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime();
lastOpTimeFetched = _lastOpTimeFetched;
if (!_syncSourceHost.empty()) {
log() << "Clearing sync source " << _syncSourceHost << " to choose a new one.";
}
_syncSourceHost = HostAndPort();
_syncSourceResolver = stdx::make_unique(
_replicationCoordinatorExternalState->getTaskExecutor(),
_replCoord,
lastOpTimeFetched,
requiredOpTime,
[&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; });
}
// This may deadlock if called inside the mutex because SyncSourceResolver::startup() calls
// ReplicationCoordinator::chooseNewSyncSource(). ReplicationCoordinatorImpl's mutex has to
// acquired before BackgroundSync's.
// It is safe to call startup() outside the mutex on this instance of SyncSourceResolver because
// we do not destroy this instance outside of this function which is only called from a single
// thread.
auto status = _syncSourceResolver->startup();
if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) {
return;
}
fassert(40349, status);
_syncSourceResolver->join();
{
stdx::lock_guard lock(_mutex);
_syncSourceResolver.reset();
}
if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) {
// All (accessible) sync sources were too stale.
if (_replCoord->getMemberState().primary()) {
warning() << "Too stale to catch up.";
log() << "Our newest OpTime : " << lastOpTimeFetched;
log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen
<< " from " << syncSourceResp.getSyncSource();
_replCoord->abortCatchupIfNeeded().transitional_ignore();
return;
}
// We only need to mark ourselves as too stale once.
if (_tooStale) {
return;
}
// Mark yourself as too stale.
_tooStale = true;
// Need to take global X lock to transition out of SECONDARY.
auto opCtx = cc().makeOperationContext();
Lock::GlobalWrite globalWriteLock(opCtx.get());
error() << "too stale to catch up -- entering maintenance mode";
log() << "Our newest OpTime : " << lastOpTimeFetched;
log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen;
log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
// Activate maintenance mode and transition to RECOVERING.
auto status = _replCoord->setMaintenanceMode(true);
if (!status.isOK()) {
warning() << "Failed to transition into maintenance mode: " << status;
}
status = _replCoord->setFollowerMode(MemberState::RS_RECOVERING);
if (!status.isOK()) {
warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING)
<< ". Current state: " << _replCoord->getMemberState() << causedBy(status);
}
return;
} else if (syncSourceResp.isOK() && !syncSourceResp.getSyncSource().empty()) {
{
stdx::lock_guard lock(_mutex);
_syncSourceHost = syncSourceResp.getSyncSource();
source = _syncSourceHost;
}
// If our sync source has not changed, it is likely caused by our heartbeat data map being
// out of date. In that case we sleep for 1 second to reduce the amount we spin waiting
// for our map to update.
if (oldSource == source) {
log() << "Chose same sync source candidate as last time, " << source
<< ". Sleeping for 1 second to avoid immediately choosing a new sync source for "
"the same reason as last time.";
sleepsecs(1);
} else {
log() << "Changed sync source from "
<< (oldSource.empty() ? std::string("empty") : oldSource.toString()) << " to "
<< source;
}
} else {
if (!syncSourceResp.isOK()) {
log() << "failed to find sync source, received error "
<< syncSourceResp.syncSourceStatus.getStatus();
}
// No sync source found.
sleepsecs(1);
return;
}
// If we find a good sync source after having gone too stale, disable maintenance mode so we can
// transition to SECONDARY.
if (_tooStale) {
_tooStale = false;
log() << "No longer too stale. Able to sync from " << source;
auto status = _replCoord->setMaintenanceMode(false);
if (!status.isOK()) {
warning() << "Failed to leave maintenance mode: " << status;
}
}
long long lastHashFetched;
{
stdx::lock_guard lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
lastOpTimeFetched = _lastOpTimeFetched;
lastHashFetched = _lastFetchedHash;
}
if (!_replCoord->getMemberState().primary()) {
_replCoord->signalUpstreamUpdater();
}
// Set the applied point if unset. This is most likely the first time we've established a sync
// source since stepping down or otherwise clearing the applied point. We need to set this here,
// before the OplogWriter gets a chance to append to the oplog.
{
auto opCtx = cc().makeOperationContext();
if (_replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx.get()).isNull()) {
_replicationProcess->getConsistencyMarkers()->setAppliedThrough(
opCtx.get(), _replCoord->getMyLastAppliedOpTime());
}
}
// "lastFetched" not used. Already set in _enqueueDocuments.
Status fetcherReturnStatus = Status::OK();
DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState(
_replCoord, _replicationCoordinatorExternalState, this);
OplogFetcher* oplogFetcher;
try {
auto onOplogFetcherShutdownCallbackFn = [&fetcherReturnStatus](const Status& status) {
fetcherReturnStatus = status;
};
// The construction of OplogFetcher has to be outside bgsync mutex, because it calls
// replication coordinator.
auto oplogFetcherPtr = stdx::make_unique(
_replicationCoordinatorExternalState->getTaskExecutor(),
OpTimeWithHash(lastHashFetched, lastOpTimeFetched),
source,
NamespaceString::kRsOplogNamespace,
_replCoord->getConfig(),
_replicationCoordinatorExternalState->getOplogFetcherSteadyStateMaxFetcherRestarts(),
syncSourceResp.rbid,
true /* requireFresherSyncSource */,
&dataReplicatorExternalState,
[this](const auto& a1, const auto& a2, const auto& a3) {
return this->_enqueueDocuments(a1, a2, a3);
},
onOplogFetcherShutdownCallbackFn,
bgSyncOplogFetcherBatchSize);
stdx::lock_guard lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
_oplogFetcher = std::move(oplogFetcherPtr);
oplogFetcher = _oplogFetcher.get();
} catch (const mongo::DBException&) {
fassertFailedWithStatus(34440, exceptionToStatus());
}
const auto logLevel = getTestCommandsEnabled() ? 0 : 1;
LOG(logLevel) << "scheduling fetcher to read remote oplog on " << source << " starting at "
<< oplogFetcher->getFindQuery_forTest()["filter"];
auto scheduleStatus = oplogFetcher->startup();
if (!scheduleStatus.isOK()) {
warning() << "unable to schedule fetcher to read remote oplog on " << source << ": "
<< scheduleStatus;
return;
}
oplogFetcher->join();
LOG(1) << "fetcher stopped reading remote oplog on " << source;
// If the background sync is stopped after the fetcher is started, we need to
// re-evaluate our sync source and oplog common point.
if (getState() != ProducerState::Running) {
log() << "Replication producer stopped after oplog fetcher finished returning a batch from "
"our sync source. Abandoning this batch of oplog entries and re-evaluating our "
"sync source.";
return;
}
if (fetcherReturnStatus.code() == ErrorCodes::OplogOutOfOrder) {
// This is bad because it means that our source
// has not returned oplog entries in ascending ts order, and they need to be.
warning() << redact(fetcherReturnStatus);
// Do not blacklist the server here, it will be blacklisted when we try to reuse it,
// if it can't return a matching oplog start from the last fetch oplog ts field.
return;
} else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) {
auto opCtx = cc().makeOperationContext();
auto storageInterface = StorageInterface::get(opCtx.get());
_runRollback(
opCtx.get(), fetcherReturnStatus, source, syncSourceResp.rbid, storageInterface);
} else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) {
Seconds blacklistDuration(60);
warning() << "Fetcher got invalid BSON while querying oplog. Blacklisting sync source "
<< source << " for " << blacklistDuration << ".";
_replCoord->blacklistSyncSource(source, Date_t::now() + blacklistDuration);
} else if (!fetcherReturnStatus.isOK()) {
warning() << "Fetcher stopped querying remote oplog with error: "
<< redact(fetcherReturnStatus);
}
}
Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
const OplogFetcher::DocumentsInfo& info) {
// If this is the first batch of operations returned from the query, "toApplyDocumentCount" will
// be one fewer than "networkDocumentCount" because the first document (which was applied
// previously) is skipped.
if (info.toApplyDocumentCount == 0) {
return Status::OK(); // Nothing to do.
}
auto opCtx = cc().makeOperationContext();
// Wait for enough space.
_oplogApplier->getBuffer()->waitForSpace(opCtx.get(), info.toApplyDocumentBytes);
{
// Don't add more to the buffer if we are in shutdown. Continue holding the lock until we
// are done to prevent going into shutdown. This avoids a race where shutdown() clears the
// buffer between the time we check _inShutdown and the point where we finish writing to the
// buffer.
stdx::unique_lock lock(_mutex);
if (_state != ProducerState::Running) {
return Status::OK();
}
// Buffer docs for later application.
_oplogApplier->enqueue(opCtx.get(), begin, end);
// Update last fetched info.
_lastFetchedHash = info.lastDocument.value;
_lastOpTimeFetched = info.lastDocument.opTime;
LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched;
}
// Check some things periodically (whenever we run out of items in the current cursor batch).
if (info.networkDocumentBytes > 0 && info.networkDocumentBytes < kSmallBatchLimitBytes) {
// On a very low latency network, if we don't wait a little, we'll be
// getting ops to write almost one at a time. This will both be expensive
// for the upstream server as well as potentially defeating our parallel
// application of batches on the secondary.
//
// The inference here is basically if the batch is really small, we are "caught up".
sleepmillis(kSleepToAllowBatchingMillis);
}
return Status::OK();
}
void BackgroundSync::_runRollback(OperationContext* opCtx,
const Status& fetcherReturnStatus,
const HostAndPort& source,
int requiredRBID,
StorageInterface* storageInterface) {
if (_replCoord->getMemberState().primary()) {
warning() << "Rollback situation detected in catch-up mode. Aborting catch-up mode.";
_replCoord->abortCatchupIfNeeded().transitional_ignore();
return;
}
ShouldNotConflictWithSecondaryBatchApplicationBlock noConflict(opCtx->lockState());
// Explicitly start future read transactions without a timestamp.
opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
// Rollback is a synchronous operation that uses the task executor and may not be
// executed inside the fetcher callback.
OpTime lastOpTimeFetched;
{
stdx::lock_guard lock(_mutex);
lastOpTimeFetched = _lastOpTimeFetched;
}
log() << "Starting rollback due to " << redact(fetcherReturnStatus);
log() << "Replication commit point: " << _replCoord->getLastCommittedOpTime();
// TODO: change this to call into the Applier directly to block until the applier is
// drained.
//
// Wait till all buffered oplog entries have drained and been applied.
auto lastApplied = _replCoord->getMyLastAppliedOpTime();
if (lastApplied != lastOpTimeFetched) {
log() << "Waiting for all operations from " << lastApplied << " until " << lastOpTimeFetched
<< " to be applied before starting rollback.";
while (lastOpTimeFetched > (lastApplied = _replCoord->getMyLastAppliedOpTime())) {
sleepmillis(10);
if (getState() != ProducerState::Running) {
return;
}
}
}
if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) {
// This log output is used in js tests so please leave it.
log() << "rollback - rollbackHangBeforeStart fail point "
"enabled. Blocking until fail point is disabled.";
while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) {
mongo::sleepsecs(1);
}
}
OplogInterfaceLocal localOplog(opCtx, NamespaceString::kRsOplogNamespace.ns());
const int messagingPortTags = 0;
ConnectionPool connectionPool(messagingPortTags);
std::unique_ptr connection;
auto getConnection = [&connection, &connectionPool, source]() -> DBClientBase* {
if (!connection.get()) {
connection.reset(new ConnectionPool::ConnectionPtr(
&connectionPool, source, Date_t::now(), kRollbackOplogSocketTimeout));
};
return connection->get();
};
// Because oplog visibility is updated asynchronously, wait until all uncommitted oplog entries
// are visible before potentially truncating the oplog.
storageInterface->waitForAllEarlierOplogWritesToBeVisible(opCtx);
auto storageEngine = opCtx->getServiceContext()->getStorageEngine();
if (!forceRollbackViaRefetch.load() && storageEngine->supportsRecoverToStableTimestamp()) {
log() << "Rollback using 'recoverToStableTimestamp' method.";
_runRollbackViaRecoverToCheckpoint(
opCtx, source, &localOplog, storageInterface, getConnection);
} else {
log() << "Rollback using the 'rollbackViaRefetch' method.";
_fallBackOnRollbackViaRefetch(opCtx, source, requiredRBID, &localOplog, getConnection);
}
// Reset the producer to clear the sync source and the last optime fetched.
stop(true);
startProducerIfStopped();
}
void BackgroundSync::_runRollbackViaRecoverToCheckpoint(
OperationContext* opCtx,
const HostAndPort& source,
OplogInterface* localOplog,
StorageInterface* storageInterface,
OplogInterfaceRemote::GetConnectionFn getConnection) {
OplogInterfaceRemote remoteOplog(source,
getConnection,
NamespaceString::kRsOplogNamespace.ns(),
rollbackRemoteOplogQueryBatchSize.load());
{
stdx::lock_guard lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
}
_rollback = stdx::make_unique(
localOplog, &remoteOplog, storageInterface, _replicationProcess, _replCoord);
log() << "Scheduling rollback (sync source: " << source << ")";
auto status = _rollback->runRollback(opCtx);
if (status.isOK()) {
log() << "Rollback successful.";
} else if (status == ErrorCodes::UnrecoverableRollbackError) {
severe() << "Rollback failed with unrecoverable error: " << status;
fassertFailedWithStatusNoTrace(50666, status);
} else {
warning() << "Rollback failed with retryable error: " << status;
}
}
void BackgroundSync::_fallBackOnRollbackViaRefetch(
OperationContext* opCtx,
const HostAndPort& source,
int requiredRBID,
OplogInterface* localOplog,
OplogInterfaceRemote::GetConnectionFn getConnection) {
RollbackSourceImpl rollbackSource(getConnection,
source,
NamespaceString::kRsOplogNamespace.ns(),
rollbackRemoteOplogQueryBatchSize.load());
rollback(opCtx, *localOplog, rollbackSource, requiredRBID, _replCoord, _replicationProcess);
}
HostAndPort BackgroundSync::getSyncTarget() const {
stdx::unique_lock lock(_mutex);
return _syncSourceHost;
}
void BackgroundSync::clearSyncTarget() {
stdx::unique_lock lock(_mutex);
log() << "Resetting sync source to empty, which was " << _syncSourceHost;
_syncSourceHost = HostAndPort();
}
void BackgroundSync::stop(bool resetLastFetchedOptime) {
stdx::lock_guard lock(_mutex);
_state = ProducerState::Stopped;
log() << "Stopping replication producer";
_syncSourceHost = HostAndPort();
if (resetLastFetchedOptime) {
invariant(_oplogApplier->getBuffer()->isEmpty());
_lastOpTimeFetched = OpTime();
_lastFetchedHash = 0;
log() << "Resetting last fetched optimes in bgsync";
}
if (_syncSourceResolver) {
_syncSourceResolver->shutdown();
}
if (_oplogFetcher) {
_oplogFetcher->shutdown();
}
}
void BackgroundSync::start(OperationContext* opCtx) {
OpTimeWithHash lastAppliedOpTimeWithHash;
ShouldNotConflictWithSecondaryBatchApplicationBlock noConflict(opCtx->lockState());
// Explicitly start future read transactions without a timestamp.
opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
do {
lastAppliedOpTimeWithHash = _readLastAppliedOpTimeWithHash(opCtx);
stdx::lock_guard lk(_mutex);
// Double check the state after acquiring the mutex.
if (_state != ProducerState::Starting) {
return;
}
// If a node steps down during drain mode, then the buffer may not be empty at the beginning
// of secondary state.
if (!_oplogApplier->getBuffer()->isEmpty()) {
log() << "going to start syncing, but buffer is not empty";
}
_state = ProducerState::Running;
// When a node steps down during drain mode, the last fetched optime would be newer than
// the last applied.
if (_lastOpTimeFetched <= lastAppliedOpTimeWithHash.opTime) {
LOG(1) << "Setting bgsync _lastOpTimeFetched=" << lastAppliedOpTimeWithHash.opTime
<< " and _lastFetchedHash=" << lastAppliedOpTimeWithHash.value
<< ". Previous _lastOpTimeFetched: " << _lastOpTimeFetched;
_lastOpTimeFetched = lastAppliedOpTimeWithHash.opTime;
_lastFetchedHash = lastAppliedOpTimeWithHash.value;
}
// Reload the last applied optime from disk if it has been changed.
} while (lastAppliedOpTimeWithHash.opTime != _replCoord->getMyLastAppliedOpTime());
LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash;
}
OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* opCtx) {
BSONObj oplogEntry;
try {
bool success = writeConflictRetry(
opCtx, "readLastAppliedHash", NamespaceString::kRsOplogNamespace.ns(), [&] {
Lock::DBLock lk(opCtx, "local", MODE_X);
return Helpers::getLast(
opCtx, NamespaceString::kRsOplogNamespace.ns().c_str(), oplogEntry);
});
if (!success) {
// This can happen when we are to do an initial sync. lastHash will be set
// after the initial sync is complete.
return OpTimeWithHash(0);
}
} catch (const DBException& ex) {
severe() << "Problem reading " << NamespaceString::kRsOplogNamespace.ns() << ": "
<< redact(ex);
fassertFailed(18904);
}
long long hash;
auto status = bsonExtractIntegerField(oplogEntry, kHashFieldName, &hash);
if (!status.isOK()) {
severe() << "Most recent entry in " << NamespaceString::kRsOplogNamespace.ns()
<< " is missing or has invalid \"" << kHashFieldName
<< "\" field. Oplog entry: " << redact(oplogEntry) << ": " << redact(status);
fassertFailed(18902);
}
OplogEntry parsedEntry(oplogEntry);
auto lastOptime = OpTimeWithHash(hash, parsedEntry.getOpTime());
LOG(1) << "Successfully read last entry of oplog while starting bgsync: " << redact(oplogEntry);
return lastOptime;
}
bool BackgroundSync::shouldStopFetching() const {
// Check if we have been stopped.
if (getState() != ProducerState::Running) {
LOG(2) << "Stopping oplog fetcher due to stop request.";
return true;
}
// Check current sync source.
if (getSyncTarget().empty()) {
LOG(1) << "Stopping oplog fetcher; canceling oplog query because we have no valid sync "
"source.";
return true;
}
return false;
}
BackgroundSync::ProducerState BackgroundSync::getState() const {
stdx::lock_guard lock(_mutex);
return _state;
}
void BackgroundSync::startProducerIfStopped() {
stdx::lock_guard lock(_mutex);
// Let producer run if it's already running.
if (_state == ProducerState::Stopped) {
_state = ProducerState::Starting;
}
}
} // namespace repl
} // namespace mongo