/**
* Copyright (C) 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/base/counter.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connection_pool.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/repl/data_replicator_external_state_impl.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/rollback_source_impl.h"
#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/s/shard_identity_rollback_notifier.h"
#include "mongo/db/server_parameters.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/time_support.h"
namespace mongo {
using std::string;
namespace repl {
namespace {
const char kHashFieldName[] = "h";
const int kSleepToAllowBatchingMillis = 2;
const int kSmallBatchLimitBytes = 40000;
const Milliseconds kRollbackOplogSocketTimeout(10 * 60 * 1000);
// Set this to true to force rollbacks to use the 3.4 implementation.
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(use3dot4Rollback, bool, true);
/**
* Extends DataReplicatorExternalStateImpl to be member state aware.
*/
class DataReplicatorExternalStateBackgroundSync : public DataReplicatorExternalStateImpl {
public:
DataReplicatorExternalStateBackgroundSync(
ReplicationCoordinator* replicationCoordinator,
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
BackgroundSync* bgsync);
bool shouldStopFetching(const HostAndPort& source,
const rpc::ReplSetMetadata& replMetadata,
boost::optional oqMetadata) override;
private:
BackgroundSync* _bgsync;
};
DataReplicatorExternalStateBackgroundSync::DataReplicatorExternalStateBackgroundSync(
ReplicationCoordinator* replicationCoordinator,
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
BackgroundSync* bgsync)
: DataReplicatorExternalStateImpl(replicationCoordinator, replicationCoordinatorExternalState),
_bgsync(bgsync) {}
bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(
const HostAndPort& source,
const rpc::ReplSetMetadata& replMetadata,
boost::optional oqMetadata) {
if (_bgsync->shouldStopFetching()) {
return true;
}
return DataReplicatorExternalStateImpl::shouldStopFetching(source, replMetadata, oqMetadata);
}
size_t getSize(const BSONObj& o) {
// SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion
return static_cast(o.objsize());
}
} // namespace
// Failpoint which causes rollback to hang before starting.
MONGO_FP_DECLARE(rollbackHangBeforeStart);
// The count of items in the buffer
static Counter64 bufferCountGauge;
static ServerStatusMetricField displayBufferCount("repl.buffer.count",
&bufferCountGauge);
// The size (bytes) of items in the buffer
static Counter64 bufferSizeGauge;
static ServerStatusMetricField displayBufferSize("repl.buffer.sizeBytes",
&bufferSizeGauge);
// The max size (bytes) of the buffer. If the buffer does not have a size constraint, this is
// set to 0.
static Counter64 bufferMaxSizeGauge;
static ServerStatusMetricField displayBufferMaxSize("repl.buffer.maxSizeBytes",
&bufferMaxSizeGauge);
BackgroundSync::BackgroundSync(
ReplicationCoordinatorExternalState* replicationCoordinatorExternalState,
std::unique_ptr oplogBuffer)
: _oplogBuffer(std::move(oplogBuffer)),
_replCoord(getGlobalReplicationCoordinator()),
_replicationCoordinatorExternalState(replicationCoordinatorExternalState) {
// Update "repl.buffer.maxSizeBytes" server status metric to reflect the current oplog buffer's
// max size.
bufferMaxSizeGauge.increment(_oplogBuffer->getMaxSize() - bufferMaxSizeGauge.get());
}
void BackgroundSync::startup(OperationContext* opCtx) {
_oplogBuffer->startup(opCtx);
invariant(!_producerThread);
_producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::_run, this)));
}
void BackgroundSync::shutdown(OperationContext* opCtx) {
stdx::lock_guard lock(_mutex);
// Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but
// ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is
// waiting for an operation to be past the slaveDelay point.
clearBuffer(opCtx);
_state = ProducerState::Stopped;
if (_syncSourceResolver) {
_syncSourceResolver->shutdown();
}
if (_oplogFetcher) {
_oplogFetcher->shutdown();
}
if (_rollback) {
_rollback->shutdown();
}
_inShutdown = true;
}
void BackgroundSync::join(OperationContext* opCtx) {
_producerThread->join();
_oplogBuffer->shutdown(opCtx);
}
bool BackgroundSync::inShutdown() const {
stdx::lock_guard lock(_mutex);
return _inShutdown_inlock();
}
bool BackgroundSync::_inShutdown_inlock() const {
return _inShutdown;
}
void BackgroundSync::_run() {
Client::initThread("rsBackgroundSync");
AuthorizationSession::get(cc())->grantInternalAuthorization();
while (!inShutdown()) {
try {
_runProducer();
} catch (const DBException& e) {
std::string msg(str::stream() << "sync producer problem: " << redact(e));
error() << msg;
_replCoord->setMyHeartbeatMessage(msg);
sleepmillis(100); // sleep a bit to keep from hammering this thread with temp. errors.
} catch (const std::exception& e2) {
// redact(std::exception&) doesn't work
severe() << "sync producer exception: " << redact(e2.what());
fassertFailed(28546);
}
}
stop(true);
}
void BackgroundSync::_runProducer() {
if (getState() == ProducerState::Stopped) {
sleepsecs(1);
return;
}
// TODO(spencer): Use a condition variable to await loading a config.
// TODO(siyuan): Control bgsync with producer state.
if (_replCoord->getMemberState().startup()) {
// Wait for a config to be loaded
sleepsecs(1);
return;
}
invariant(!_replCoord->getMemberState().rollback());
// We need to wait until initial sync has started.
if (_replCoord->getMyLastAppliedOpTime().isNull()) {
sleepsecs(1);
return;
}
// we want to start when we're no longer primary
// start() also loads _lastOpTimeFetched, which we know is set from the "if"
auto opCtx = cc().makeOperationContext();
if (getState() == ProducerState::Starting) {
start(opCtx.get());
}
_produce(opCtx.get());
}
void BackgroundSync::_produce(OperationContext* opCtx) {
if (MONGO_FAIL_POINT(stopReplProducer)) {
// This log output is used in js tests so please leave it.
log() << "bgsync - stopReplProducer fail point "
"enabled. Blocking until fail point is disabled.";
// TODO(SERVER-27120): Remove the return statement and uncomment the while loop.
// Currently we cannot block here or we prevent primaries from being fully elected since
// we'll never call _signalNoNewDataForApplier.
// while (MONGO_FAIL_POINT(stopReplProducer) && !inShutdown()) {
// mongo::sleepsecs(1);
// }
mongo::sleepsecs(1);
return;
}
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
{
stdx::unique_lock lock(_mutex);
if (_lastOpTimeFetched.isNull()) {
// then we're initial syncing and we're still waiting for this to be set
lock.unlock();
sleepsecs(1);
// if there is no one to sync from
return;
}
if (_state != ProducerState::Running) {
return;
}
}
auto storageInterface = StorageInterface::get(opCtx);
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
HostAndPort source;
HostAndPort oldSource = _syncSourceHost;
SyncSourceResolverResponse syncSourceResp;
{
const OpTime minValidSaved = storageInterface->getMinValid(opCtx);
stdx::lock_guard lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime();
lastOpTimeFetched = _lastOpTimeFetched;
_syncSourceHost = HostAndPort();
_syncSourceResolver = stdx::make_unique(
_replicationCoordinatorExternalState->getTaskExecutor(),
_replCoord,
lastOpTimeFetched,
requiredOpTime,
[&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; });
}
// This may deadlock if called inside the mutex because SyncSourceResolver::startup() calls
// ReplicationCoordinator::chooseNewSyncSource(). ReplicationCoordinatorImpl's mutex has to
// acquired before BackgroundSync's.
// It is safe to call startup() outside the mutex on this instance of SyncSourceResolver because
// we do not destroy this instance outside of this function which is only called from a single
// thread.
auto status = _syncSourceResolver->startup();
if (ErrorCodes::CallbackCanceled == status || ErrorCodes::isShutdownError(status.code())) {
return;
}
fassertStatusOK(40349, status);
_syncSourceResolver->join();
{
stdx::lock_guard lock(_mutex);
_syncSourceResolver.reset();
}
if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) {
// All (accessible) sync sources were too stale.
if (_replCoord->getMemberState().primary()) {
warning() << "Too stale to catch up.";
log() << "Our newest OpTime : " << lastOpTimeFetched;
log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen
<< " from " << syncSourceResp.getSyncSource();
_replCoord->abortCatchupIfNeeded();
return;
}
// We only need to mark ourselves as too stale once.
if (_tooStale) {
return;
}
// Mark yourself as too stale.
_tooStale = true;
error() << "too stale to catch up -- entering maintenance mode";
log() << "Our newest OpTime : " << lastOpTimeFetched;
log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen;
log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
// Activate maintenance mode and transition to RECOVERING.
auto status = _replCoord->setMaintenanceMode(true);
if (!status.isOK()) {
warning() << "Failed to transition into maintenance mode: " << status;
}
bool worked = _replCoord->setFollowerMode(MemberState::RS_RECOVERING);
if (!worked) {
warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING)
<< ". Current state: " << _replCoord->getMemberState();
}
return;
} else if (syncSourceResp.isOK() && !syncSourceResp.getSyncSource().empty()) {
{
stdx::lock_guard lock(_mutex);
_syncSourceHost = syncSourceResp.getSyncSource();
source = _syncSourceHost;
}
// If our sync source has not changed, it is likely caused by our heartbeat data map being
// out of date. In that case we sleep for 1 second to reduce the amount we spin waiting
// for our map to update.
if (oldSource == source) {
log() << "Chose same sync source candidate as last time, " << source
<< ". Sleeping for 1 second to avoid immediately choosing a new sync source for "
"the same reason as last time.";
sleepsecs(1);
}
} else {
if (!syncSourceResp.isOK()) {
log() << "failed to find sync source, received error "
<< syncSourceResp.syncSourceStatus.getStatus();
}
// No sync source found.
sleepsecs(1);
return;
}
// If we find a good sync source after having gone too stale, disable maintenance mode so we can
// transition to SECONDARY.
if (_tooStale) {
_tooStale = false;
log() << "No longer too stale. Able to sync from " << _syncSourceHost;
auto status = _replCoord->setMaintenanceMode(false);
if (!status.isOK()) {
warning() << "Failed to leave maintenance mode: " << status;
}
}
long long lastHashFetched;
{
stdx::lock_guard lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
lastOpTimeFetched = _lastOpTimeFetched;
lastHashFetched = _lastFetchedHash;
}
if (!_replCoord->getMemberState().primary()) {
_replCoord->signalUpstreamUpdater();
}
// Set the applied point if unset. This is most likely the first time we've established a sync
// source since stepping down or otherwise clearing the applied point. We need to set this here,
// before the OplogWriter gets a chance to append to the oplog.
if (storageInterface->getAppliedThrough(opCtx).isNull()) {
storageInterface->setAppliedThrough(opCtx, _replCoord->getMyLastAppliedOpTime());
}
// "lastFetched" not used. Already set in _enqueueDocuments.
Status fetcherReturnStatus = Status::OK();
DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState(
_replCoord, _replicationCoordinatorExternalState, this);
OplogFetcher* oplogFetcher;
try {
auto onOplogFetcherShutdownCallbackFn = [&fetcherReturnStatus](const Status& status) {
fetcherReturnStatus = status;
};
// The construction of OplogFetcher has to be outside bgsync mutex, because it calls
// replication coordinator.
auto oplogFetcherPtr = stdx::make_unique(
_replicationCoordinatorExternalState->getTaskExecutor(),
OpTimeWithHash(lastHashFetched, lastOpTimeFetched),
source,
NamespaceString(rsOplogName),
_replCoord->getConfig(),
_replicationCoordinatorExternalState->getOplogFetcherMaxFetcherRestarts(),
syncSourceResp.rbid,
true /* requireFresherSyncSource */,
&dataReplicatorExternalState,
stdx::bind(&BackgroundSync::_enqueueDocuments,
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
stdx::placeholders::_3),
onOplogFetcherShutdownCallbackFn);
stdx::lock_guard lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
_oplogFetcher = std::move(oplogFetcherPtr);
oplogFetcher = _oplogFetcher.get();
} catch (const mongo::DBException& ex) {
fassertFailedWithStatus(34440, exceptionToStatus());
}
const auto logLevel = Command::testCommandsEnabled ? 0 : 1;
LOG(logLevel) << "scheduling fetcher to read remote oplog on " << _syncSourceHost
<< " starting at " << oplogFetcher->getFindQuery_forTest()["filter"];
auto scheduleStatus = oplogFetcher->startup();
if (!scheduleStatus.isOK()) {
warning() << "unable to schedule fetcher to read remote oplog on " << source << ": "
<< scheduleStatus;
return;
}
oplogFetcher->join();
LOG(1) << "fetcher stopped reading remote oplog on " << source;
// If the background sync is stopped after the fetcher is started, we need to
// re-evaluate our sync source and oplog common point.
if (getState() != ProducerState::Running) {
log() << "Replication producer stopped after oplog fetcher finished returning a batch from "
"our sync source. Abandoning this batch of oplog entries and re-evaluating our "
"sync source.";
return;
}
if (fetcherReturnStatus.code() == ErrorCodes::OplogOutOfOrder) {
// This is bad because it means that our source
// has not returned oplog entries in ascending ts order, and they need to be.
warning() << redact(fetcherReturnStatus);
// Do not blacklist the server here, it will be blacklisted when we try to reuse it,
// if it can't return a matching oplog start from the last fetch oplog ts field.
return;
} else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) {
_runRollback(opCtx, fetcherReturnStatus, source, syncSourceResp.rbid, storageInterface);
} else if (fetcherReturnStatus == ErrorCodes::InvalidBSON) {
Seconds blacklistDuration(60);
warning() << "Fetcher got invalid BSON while querying oplog. Blacklisting sync source "
<< source << " for " << blacklistDuration << ".";
_replCoord->blacklistSyncSource(source, Date_t::now() + blacklistDuration);
} else if (!fetcherReturnStatus.isOK()) {
warning() << "Fetcher stopped querying remote oplog with error: "
<< redact(fetcherReturnStatus);
}
}
Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
const OplogFetcher::DocumentsInfo& info) {
// If this is the first batch of operations returned from the query, "toApplyDocumentCount" will
// be one fewer than "networkDocumentCount" because the first document (which was applied
// previously) is skipped.
if (info.toApplyDocumentCount == 0) {
return Status::OK(); // Nothing to do.
}
auto opCtx = cc().makeOperationContext();
// Wait for enough space.
_oplogBuffer->waitForSpace(opCtx.get(), info.toApplyDocumentBytes);
{
// Don't add more to the buffer if we are in shutdown. Continue holding the lock until we
// are done to prevent going into shutdown. This avoids a race where shutdown() clears the
// buffer between the time we check _inShutdown and the point where we finish writing to the
// buffer.
stdx::unique_lock lock(_mutex);
if (_state != ProducerState::Running) {
return Status::OK();
}
OCCASIONALLY {
LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes";
}
// Buffer docs for later application.
_oplogBuffer->pushAllNonBlocking(opCtx.get(), begin, end);
// Update last fetched info.
_lastFetchedHash = info.lastDocument.value;
_lastOpTimeFetched = info.lastDocument.opTime;
LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched;
}
bufferCountGauge.increment(info.toApplyDocumentCount);
bufferSizeGauge.increment(info.toApplyDocumentBytes);
// Check some things periodically (whenever we run out of items in the current cursor batch).
if (info.networkDocumentBytes > 0 && info.networkDocumentBytes < kSmallBatchLimitBytes) {
// On a very low latency network, if we don't wait a little, we'll be
// getting ops to write almost one at a time. This will both be expensive
// for the upstream server as well as potentially defeating our parallel
// application of batches on the secondary.
//
// The inference here is basically if the batch is really small, we are "caught up".
sleepmillis(kSleepToAllowBatchingMillis);
}
return Status::OK();
}
bool BackgroundSync::peek(OperationContext* opCtx, BSONObj* op) {
return _oplogBuffer->peek(opCtx, op);
}
void BackgroundSync::waitForMore() {
// Block for one second before timing out.
_oplogBuffer->waitForData(Seconds(1));
}
void BackgroundSync::consume(OperationContext* opCtx) {
// this is just to get the op off the queue, it's been peeked at
// and queued for application already
BSONObj op;
if (_oplogBuffer->tryPop(opCtx, &op)) {
bufferCountGauge.decrement(1);
bufferSizeGauge.decrement(getSize(op));
} else {
invariant(inShutdown());
// This means that shutdown() was called between the consumer's calls to peek() and
// consume(). shutdown() cleared the buffer so there is nothing for us to consume here.
// Since our postcondition is already met, it is safe to return successfully.
}
}
void BackgroundSync::_runRollback(OperationContext* opCtx,
const Status& fetcherReturnStatus,
const HostAndPort& source,
int requiredRBID,
StorageInterface* storageInterface) {
if (_replCoord->getMemberState().primary()) {
warning() << "Rollback situation detected in catch-up mode. Aborting catch-up mode.";
_replCoord->abortCatchupIfNeeded();
return;
}
// Rollback is a synchronous operation that uses the task executor and may not be
// executed inside the fetcher callback.
OpTime lastOpTimeFetched;
{
stdx::lock_guard lock(_mutex);
lastOpTimeFetched = _lastOpTimeFetched;
}
log() << "Starting rollback due to " << redact(fetcherReturnStatus);
// TODO: change this to call into the Applier directly to block until the applier is
// drained.
//
// Wait till all buffered oplog entries have drained and been applied.
auto lastApplied = _replCoord->getMyLastAppliedOpTime();
if (lastApplied != lastOpTimeFetched) {
log() << "Waiting for all operations from " << lastApplied << " until " << lastOpTimeFetched
<< " to be applied before starting rollback.";
while (lastOpTimeFetched > (lastApplied = _replCoord->getMyLastAppliedOpTime())) {
sleepmillis(10);
if (getState() != ProducerState::Running) {
return;
}
}
}
if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) {
// This log output is used in js tests so please leave it.
log() << "rollback - rollbackHangBeforeStart fail point "
"enabled. Blocking until fail point is disabled.";
while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) {
mongo::sleepsecs(1);
}
}
OplogInterfaceLocal localOplog(opCtx, rsOplogName);
if (use3dot4Rollback) {
log() << "Rollback falling back on 3.4 algorithm due to startup server parameter";
_fallBackOn3dot4Rollback(opCtx, source, requiredRBID, &localOplog, storageInterface);
} else {
AbstractAsyncComponent* rollback;
StatusWith onRollbackShutdownResult =
Status(ErrorCodes::InternalError, "Rollback failed but didn’t return an error message");
try {
auto executor = _replicationCoordinatorExternalState->getTaskExecutor();
auto onRollbackShutdownCallbackFn = [&onRollbackShutdownResult](
const StatusWith& lastApplied) noexcept {
onRollbackShutdownResult = lastApplied;
};
stdx::lock_guard lock(_mutex);
if (_state != ProducerState::Running) {
return;
}
_rollback = stdx::make_unique(executor,
&localOplog,
source,
requiredRBID,
_replCoord,
storageInterface,
onRollbackShutdownCallbackFn);
rollback = _rollback.get();
} catch (...) {
fassertFailedWithStatus(40401, exceptionToStatus());
}
log() << "Scheduling rollback (sync source: " << source << ")";
auto scheduleStatus = rollback->startup();
if (!scheduleStatus.isOK()) {
warning() << "Unable to schedule rollback: " << scheduleStatus;
} else {
rollback->join();
auto status = onRollbackShutdownResult.getStatus();
if (status.isOK()) {
log() << "Rollback successful. Last applied optime: "
<< onRollbackShutdownResult.getValue();
} else if (ErrorCodes::IncompatibleRollbackAlgorithm == status) {
log() << "Rollback falling back on 3.4 algorithm due to " << status;
_fallBackOn3dot4Rollback(
opCtx, source, requiredRBID, &localOplog, storageInterface);
} else {
warning() << "Rollback failed with error: " << status;
}
}
}
// Reset the producer to clear the sync source and the last optime fetched.
stop(true);
startProducerIfStopped();
}
void BackgroundSync::_fallBackOn3dot4Rollback(OperationContext* opCtx,
const HostAndPort& source,
int requiredRBID,
OplogInterface* localOplog,
StorageInterface* storageInterface) {
const int messagingPortTags = 0;
ConnectionPool connectionPool(messagingPortTags);
std::unique_ptr connection;
auto getConnection = [&connection, &connectionPool, source]() -> DBClientBase* {
if (!connection.get()) {
connection.reset(new ConnectionPool::ConnectionPtr(
&connectionPool, source, Date_t::now(), kRollbackOplogSocketTimeout));
};
return connection->get();
};
RollbackSourceImpl rollbackSource(getConnection, source, rsOplogName);
rollback(opCtx, *localOplog, rollbackSource, requiredRBID, _replCoord, storageInterface);
}
HostAndPort BackgroundSync::getSyncTarget() const {
stdx::unique_lock lock(_mutex);
return _syncSourceHost;
}
void BackgroundSync::clearSyncTarget() {
stdx::unique_lock lock(_mutex);
_syncSourceHost = HostAndPort();
}
void BackgroundSync::stop(bool resetLastFetchedOptime) {
stdx::lock_guard lock(_mutex);
_state = ProducerState::Stopped;
_syncSourceHost = HostAndPort();
if (resetLastFetchedOptime) {
invariant(_oplogBuffer->isEmpty());
_lastOpTimeFetched = OpTime();
_lastFetchedHash = 0;
}
if (_syncSourceResolver) {
_syncSourceResolver->shutdown();
}
if (_oplogFetcher) {
_oplogFetcher->shutdown();
}
}
void BackgroundSync::start(OperationContext* opCtx) {
OpTimeWithHash lastAppliedOpTimeWithHash;
do {
lastAppliedOpTimeWithHash = _readLastAppliedOpTimeWithHash(opCtx);
stdx::lock_guard lk(_mutex);
// Double check the state after acquiring the mutex.
if (_state != ProducerState::Starting) {
return;
}
// If a node steps down during drain mode, then the buffer may not be empty at the beginning
// of secondary state.
if (!_oplogBuffer->isEmpty()) {
log() << "going to start syncing, but buffer is not empty";
}
_state = ProducerState::Running;
// When a node steps down during drain mode, the last fetched optime would be newer than
// the last applied.
if (_lastOpTimeFetched <= lastAppliedOpTimeWithHash.opTime) {
_lastOpTimeFetched = lastAppliedOpTimeWithHash.opTime;
_lastFetchedHash = lastAppliedOpTimeWithHash.value;
}
// Reload the last applied optime from disk if it has been changed.
} while (lastAppliedOpTimeWithHash.opTime != _replCoord->getMyLastAppliedOpTime());
LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash;
}
void BackgroundSync::clearBuffer(OperationContext* opCtx) {
_oplogBuffer->clear(opCtx);
const auto count = bufferCountGauge.get();
bufferCountGauge.decrement(count);
const auto size = bufferSizeGauge.get();
bufferSizeGauge.decrement(size);
}
OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* opCtx) {
BSONObj oplogEntry;
try {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
Lock::DBLock lk(opCtx, "local", MODE_X);
bool success = Helpers::getLast(opCtx, rsOplogName.c_str(), oplogEntry);
if (!success) {
// This can happen when we are to do an initial sync. lastHash will be set
// after the initial sync is complete.
return OpTimeWithHash(0);
}
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "readLastAppliedHash", rsOplogName);
} catch (const DBException& ex) {
severe() << "Problem reading " << rsOplogName << ": " << redact(ex);
fassertFailed(18904);
}
long long hash;
auto status = bsonExtractIntegerField(oplogEntry, kHashFieldName, &hash);
if (!status.isOK()) {
severe() << "Most recent entry in " << rsOplogName << " is missing or has invalid \""
<< kHashFieldName << "\" field. Oplog entry: " << redact(oplogEntry) << ": "
<< redact(status);
fassertFailed(18902);
}
OplogEntry parsedEntry(oplogEntry);
return OpTimeWithHash(hash, parsedEntry.getOpTime());
}
bool BackgroundSync::shouldStopFetching() const {
// Check if we have been stopped.
if (getState() != ProducerState::Running) {
LOG(2) << "Stopping oplog fetcher due to stop request.";
return true;
}
// Check current sync source.
if (getSyncTarget().empty()) {
LOG(1) << "Stopping oplog fetcher; canceling oplog query because we have no valid sync "
"source.";
return true;
}
return false;
}
void BackgroundSync::pushTestOpToBuffer(OperationContext* opCtx, const BSONObj& op) {
_oplogBuffer->push(opCtx, op);
bufferCountGauge.increment();
bufferSizeGauge.increment(op.objsize());
}
BackgroundSync::ProducerState BackgroundSync::getState() const {
stdx::lock_guard lock(_mutex);
return _state;
}
void BackgroundSync::startProducerIfStopped() {
stdx::lock_guard lock(_mutex);
// Let producer run if it's already running.
if (_state == ProducerState::Stopped) {
_state = ProducerState::Starting;
}
}
} // namespace repl
} // namespace mongo