/**
* Copyright (C) 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/bgsync.h"
#include
#include "mongo/base/counter.h"
#include "mongo/client/connection_pool.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/rollback_source_impl.h"
#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
namespace mongo {
using std::string;
namespace repl {
namespace {
const char hashFieldName[] = "h";
int SleepToAllowBatchingMillis = 2;
const int BatchIsSmallish = 40000; // bytes
/**
* Returns new thread pool for thead pool task executor.
*/
std::unique_ptr makeThreadPool() {
ThreadPool::Options threadPoolOptions;
threadPoolOptions.poolName = "rsBackgroundSync";
return stdx::make_unique(threadPoolOptions);
}
/**
* Checks the criteria for rolling back.
* 'getNextOperation' returns the first result of the oplog tailing query.
* 'lastOpTimeFetched' should be consistent with the predicate in the query.
* Returns RemoteOplogStale if the oplog query has no results.
* Returns OplogStartMissing if we cannot find the timestamp of the last fetched operation in
* the remote oplog.
*/
Status checkRemoteOplogStart(stdx::function()> getNextOperation,
OpTime lastOpTimeFetched,
long long lastHashFetched) {
auto result = getNextOperation();
if (!result.isOK()) {
// The GTE query from upstream returns nothing, so we're ahead of the upstream.
return Status(ErrorCodes::RemoteOplogStale,
"we are ahead of the sync source, will try to roll back");
}
BSONObj o = result.getValue();
OpTime opTime = fassertStatusOK(28778, OpTime::parseFromOplogEntry(o));
long long hash = o["h"].numberLong();
if (opTime != lastOpTimeFetched || hash != lastHashFetched) {
return Status(ErrorCodes::OplogStartMissing,
str::stream() << "our last op time fetched: " << lastOpTimeFetched.toString()
<< ". source's GTE: " << opTime.toString());
}
return Status::OK();
}
} // namespace
MONGO_FP_DECLARE(rsBgSyncProduce);
MONGO_FP_DECLARE(stepDownWhileDrainingFailPoint);
BackgroundSync* BackgroundSync::s_instance = 0;
stdx::mutex BackgroundSync::s_mutex;
// The number and time spent reading batches off the network
static TimerStats getmoreReplStats;
static ServerStatusMetricField displayBatchesRecieved("repl.network.getmores",
&getmoreReplStats);
// The oplog entries read via the oplog reader
static Counter64 opsReadStats;
static ServerStatusMetricField displayOpsRead("repl.network.ops", &opsReadStats);
// The bytes read via the oplog reader
static Counter64 networkByteStats;
static ServerStatusMetricField displayBytesRead("repl.network.bytes", &networkByteStats);
// The count of items in the buffer
static Counter64 bufferCountGauge;
static ServerStatusMetricField displayBufferCount("repl.buffer.count",
&bufferCountGauge);
// The size (bytes) of items in the buffer
static Counter64 bufferSizeGauge;
static ServerStatusMetricField displayBufferSize("repl.buffer.sizeBytes",
&bufferSizeGauge);
// The max size (bytes) of the buffer
static int bufferMaxSizeGauge = 256 * 1024 * 1024;
static ServerStatusMetricField displayBufferMaxSize("repl.buffer.maxSizeBytes",
&bufferMaxSizeGauge);
BackgroundSyncInterface::~BackgroundSyncInterface() {}
namespace {
size_t getSize(const BSONObj& o) {
// SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion
return static_cast(o.objsize());
}
} // namespace
BackgroundSync::BackgroundSync()
: _buffer(bufferMaxSizeGauge, &getSize),
_threadPoolTaskExecutor(makeThreadPool(), executor::makeNetworkInterface()),
_lastOpTimeFetched(Timestamp(std::numeric_limits::max(), 0),
std::numeric_limits::max()),
_lastFetchedHash(0),
_pause(true),
_appliedBuffer(true),
_replCoord(getGlobalReplicationCoordinator()),
_initialSyncRequestedFlag(false),
_indexPrefetchConfig(PREFETCH_ALL) {}
BackgroundSync* BackgroundSync::get() {
stdx::unique_lock lock(s_mutex);
if (s_instance == NULL && !inShutdown()) {
s_instance = new BackgroundSync();
}
return s_instance;
}
void BackgroundSync::shutdown() {
stdx::lock_guard lock(_mutex);
// Clear the buffer in case the producerThread is waiting in push() due to a full queue.
invariant(inShutdown());
_buffer.clear();
_pause = true;
// Wake up producerThread so it notices that we're in shutdown
_appliedBufferCondition.notify_all();
_pausedCondition.notify_all();
}
void BackgroundSync::notify(OperationContext* txn) {
stdx::lock_guard lock(_mutex);
// If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting)
if (_buffer.empty()) {
_appliedBuffer = true;
_appliedBufferCondition.notify_all();
}
}
void BackgroundSync::producerThread() {
Client::initThread("rsBackgroundSync");
AuthorizationSession::get(cc())->grantInternalAuthorization();
_threadPoolTaskExecutor.startup();
ON_BLOCK_EXIT([this]() {
_threadPoolTaskExecutor.shutdown();
_threadPoolTaskExecutor.join();
});
while (!inShutdown()) {
try {
_producerThread();
} catch (const DBException& e) {
std::string msg(str::stream() << "sync producer problem: " << e.toString());
error() << msg;
_replCoord->setMyHeartbeatMessage(msg);
} catch (const std::exception& e2) {
severe() << "sync producer exception: " << e2.what();
fassertFailed(28546);
}
}
stop();
}
void BackgroundSync::_producerThread() {
const MemberState state = _replCoord->getMemberState();
// we want to pause when the state changes to primary
if (_replCoord->isWaitingForApplierToDrain() || state.primary()) {
if (!isPaused()) {
stop();
}
if (_replCoord->isWaitingForApplierToDrain() && _buffer.empty()) {
// This will wake up the applier if it is sitting in blockingPeek().
_buffer.clear();
}
sleepsecs(1);
return;
}
// TODO(spencer): Use a condition variable to await loading a config.
if (state.startup()) {
// Wait for a config to be loaded
sleepsecs(1);
return;
}
// We need to wait until initial sync has started.
if (_replCoord->getMyLastOptime().isNull()) {
sleepsecs(1);
return;
}
// we want to unpause when we're no longer primary
// start() also loads _lastOpTimeFetched, which we know is set from the "if"
OperationContextImpl txn;
if (isPaused()) {
start(&txn);
}
_produce(&txn);
}
void BackgroundSync::_produce(OperationContext* txn) {
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
{
stdx::unique_lock lock(_mutex);
if (_lastOpTimeFetched.isNull()) {
// then we're initial syncing and we're still waiting for this to be set
lock.unlock();
sleepsecs(1);
// if there is no one to sync from
return;
}
if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary() ||
inShutdownStrict()) {
return;
}
}
while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
sleepmillis(0);
}
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
{
stdx::unique_lock lock(_mutex);
lastOpTimeFetched = _lastOpTimeFetched;
_syncSourceHost = HostAndPort();
}
OplogReader syncSourceReader;
syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord);
// no server found
if (syncSourceReader.getHost().empty()) {
sleepsecs(1);
// if there is no one to sync from
return;
}
long long lastHashFetched;
{
stdx::lock_guard lock(_mutex);
if (_pause) {
return;
}
lastOpTimeFetched = _lastOpTimeFetched;
lastHashFetched = _lastFetchedHash;
_syncSourceHost = syncSourceReader.getHost();
_replCoord->signalUpstreamUpdater();
}
const Milliseconds oplogSocketTimeout(OplogReader::kSocketTimeout);
const auto isV1ElectionProtocol = _replCoord->isV1ElectionProtocol();
// Under protocol version 1, make the awaitData timeout (maxTimeMS) dependent on the election
// timeout. This enables the sync source to communicate liveness of the primary to secondaries.
// Under protocol version 0, use a default timeout of 2 seconds for awaitData.
const Milliseconds fetcherMaxTimeMS(
isV1ElectionProtocol ? _replCoord->getConfig().getElectionTimeoutPeriod() / 2 : Seconds(2));
// Prefer host in oplog reader to _syncSourceHost because _syncSourceHost may be cleared
// if sync source feedback fails.
const HostAndPort source = syncSourceReader.getHost();
syncSourceReader.resetConnection();
// no more references to oplog reader from here on.
// If this status is not OK after the fetcher returns from wait(),
// proceed to execute rollback
Status remoteOplogStartStatus = Status::OK();
auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback,
this,
stdx::placeholders::_1,
stdx::placeholders::_3,
stdx::cref(source),
lastOpTimeFetched,
lastHashFetched,
fetcherMaxTimeMS,
&remoteOplogStartStatus);
BSONObjBuilder cmdBob;
cmdBob.append("find", nsToCollectionSubstring(rsOplogName));
cmdBob.append("filter", BSON("ts" << BSON("$gte" << lastOpTimeFetched.getTimestamp())));
cmdBob.append("tailable", true);
cmdBob.append("oplogReplay", true);
cmdBob.append("awaitData", true);
cmdBob.append("maxTimeMS", durationCount(fetcherMaxTimeMS));
BSONObjBuilder metadataBob;
if (isV1ElectionProtocol) {
cmdBob.append("term", _replCoord->getTerm());
metadataBob.append(rpc::kReplSetMetadataFieldName, 1);
}
auto dbName = nsToDatabase(rsOplogName);
auto cmdObj = cmdBob.obj();
auto metadataObj = metadataBob.obj();
Fetcher fetcher(&_threadPoolTaskExecutor,
source,
dbName,
cmdObj,
fetcherCallback,
metadataObj,
_replCoord->getConfig().getElectionTimeoutPeriod());
auto scheduleStatus = fetcher.schedule();
if (!scheduleStatus.isOK()) {
warning() << "unable to schedule fetcher to read remote oplog on " << source << ": "
<< scheduleStatus;
return;
}
fetcher.wait();
// If the background sync is paused after the fetcher is started, we need to
// re-evaluate our sync source and oplog common point.
if (isPaused()) {
return;
}
// Execute rollback if necessary.
// Rollback is a synchronous operation that uses the task executor and may not be
// executed inside the fetcher callback.
if (!remoteOplogStartStatus.isOK()) {
const int messagingPortTags = 0;
ConnectionPool connectionPool(messagingPortTags);
std::unique_ptr connection;
auto getConnection =
[&connection, &connectionPool, oplogSocketTimeout, source]() -> DBClientBase* {
if (!connection.get()) {
connection.reset(new ConnectionPool::ConnectionPtr(
&connectionPool, source, Date_t::now(), oplogSocketTimeout));
};
return connection->get();
};
log() << "starting rollback: " << remoteOplogStartStatus;
_rollback(txn, source, getConnection);
stop();
}
}
void BackgroundSync::_fetcherCallback(const StatusWith& result,
BSONObjBuilder* bob,
const HostAndPort& source,
OpTime lastOpTimeFetched,
long long lastFetchedHash,
Milliseconds fetcherMaxTimeMS,
Status* remoteOplogStartStatus) {
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
if (!result.isOK()) {
return;
}
if (inShutdown()) {
return;
}
// Check if we have been paused.
if (isPaused()) {
return;
}
const auto& queryResponse = result.getValue();
// Forward metadata (containing liveness information) to replication coordinator.
bool receivedMetadata =
queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
if (receivedMetadata) {
auto metadataResult =
rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata);
if (!metadataResult.isOK()) {
error() << "invalid replication metadata from sync source " << source << ": "
<< metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
return;
}
const auto& metadata = metadataResult.getValue();
_replCoord->processReplSetMetadata(metadata);
if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
_replCoord->cancelAndRescheduleElectionTimeout();
}
}
const auto& documents = queryResponse.documents;
auto documentBegin = documents.cbegin();
auto documentEnd = documents.cend();
// Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
if (queryResponse.first) {
auto getNextOperation = [&documentBegin, documentEnd]() -> StatusWith {
if (documentBegin == documentEnd) {
return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing");
}
return *(documentBegin++);
};
*remoteOplogStartStatus =
checkRemoteOplogStart(getNextOperation, lastOpTimeFetched, lastFetchedHash);
if (!remoteOplogStartStatus->isOK()) {
// Stop fetcher and execute rollback.
return;
}
// If this is the first batch and no rollback is needed, we should have advanced
// the document iterator.
invariant(documentBegin != documents.cbegin());
}
// process documents
int currentBatchMessageSize = 0;
for (auto documentIter = documentBegin; documentIter != documentEnd; ++documentIter) {
if (inShutdown()) {
return;
}
// If we are transitioning to primary state, we need to leave
// this loop in order to go into bgsync-pause mode.
if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) {
LOG(1) << "waiting for draining or we are primary, not adding more ops to buffer";
return;
}
// At this point, we are guaranteed to have at least one thing to read out
// of the fetcher.
const BSONObj& o = *documentIter;
currentBatchMessageSize += o.objsize();
opsReadStats.increment();
if (MONGO_FAIL_POINT(stepDownWhileDrainingFailPoint)) {
sleepsecs(20);
}
{
stdx::unique_lock lock(_mutex);
_appliedBuffer = false;
}
OCCASIONALLY {
LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes";
}
bufferCountGauge.increment();
bufferSizeGauge.increment(getSize(o));
_buffer.push(o);
{
stdx::unique_lock lock(_mutex);
_lastFetchedHash = o["h"].numberLong();
_lastOpTimeFetched = fassertStatusOK(28770, OpTime::parseFromOplogEntry(o));
LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched;
}
}
// record time for each batch
getmoreReplStats.recordMillis(durationCount(queryResponse.elapsedMillis));
networkByteStats.increment(currentBatchMessageSize);
// Check some things periodically
// (whenever we run out of items in the
// current cursor batch)
if (currentBatchMessageSize > 0 && currentBatchMessageSize < BatchIsSmallish) {
// on a very low latency network, if we don't wait a little, we'll be
// getting ops to write almost one at a time. this will both be expensive
// for the upstream server as well as potentially defeating our parallel
// application of batches on the secondary.
//
// the inference here is basically if the batch is really small, we are
// "caught up".
//
sleepmillis(SleepToAllowBatchingMillis);
}
// If we are transitioning to primary state, we need to leave
// this loop in order to go into bgsync-pause mode.
if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) {
return;
}
// re-evaluate quality of sync target
if (_shouldChangeSyncSource(source)) {
return;
}
// Check if we have been paused.
if (isPaused()) {
return;
}
// We fill in 'bob' to signal the fetcher to process with another getMore.
invariant(bob);
bob->append("getMore", queryResponse.cursorId);
bob->append("collection", queryResponse.nss.coll());
bob->append("maxTimeMS", durationCount(fetcherMaxTimeMS));
if (receivedMetadata) {
bob->append("term", _replCoord->getTerm());
}
}
bool BackgroundSync::_shouldChangeSyncSource(const HostAndPort& syncSource) {
// is it even still around?
if (getSyncTarget().empty() || syncSource.empty()) {
return true;
}
// check other members: is any member's optime more than MaxSyncSourceLag seconds
// ahead of the current sync source?
return _replCoord->shouldChangeSyncSource(syncSource);
}
bool BackgroundSync::peek(BSONObj* op) {
return _buffer.peek(*op);
}
void BackgroundSync::waitForMore() {
BSONObj op;
// Block for one second before timing out.
// Ignore the value of the op we peeked at.
_buffer.blockingPeek(op, 1);
}
void BackgroundSync::consume() {
// this is just to get the op off the queue, it's been peeked at
// and queued for application already
BSONObj op = _buffer.blockingPop();
bufferCountGauge.decrement(1);
bufferSizeGauge.decrement(getSize(op));
}
void BackgroundSync::_rollback(OperationContext* txn,
const HostAndPort& source,
stdx::function getConnection) {
// Abort only when syncRollback detects we are in a unrecoverable state.
// In other cases, we log the message contained in the error status and retry later.
auto status = syncRollback(txn,
_replCoord->getMyLastOptime(),
OplogInterfaceLocal(txn, rsOplogName),
RollbackSourceImpl(getConnection, source, rsOplogName),
_replCoord);
if (status.isOK()) {
return;
}
if (ErrorCodes::UnrecoverableRollbackError == status.code()) {
fassertNoTrace(28723, status);
}
warning() << "rollback cannot proceed at this time (retrying later): " << status;
}
HostAndPort BackgroundSync::getSyncTarget() {
stdx::unique_lock lock(_mutex);
return _syncSourceHost;
}
void BackgroundSync::clearSyncTarget() {
stdx::unique_lock lock(_mutex);
_syncSourceHost = HostAndPort();
}
void BackgroundSync::cancelFetcher() {
_threadPoolTaskExecutor.cancelAllCommands();
}
void BackgroundSync::stop() {
stdx::lock_guard lock(_mutex);
_pause = true;
_syncSourceHost = HostAndPort();
_lastOpTimeFetched = OpTime();
_lastFetchedHash = 0;
_appliedBufferCondition.notify_all();
_pausedCondition.notify_all();
}
void BackgroundSync::start(OperationContext* txn) {
massert(16235, "going to start syncing, but buffer is not empty", _buffer.empty());
long long lastFetchedHash = _readLastAppliedHash(txn);
stdx::lock_guard lk(_mutex);
_pause = false;
// reset _last fields with current oplog data
_lastOpTimeFetched = _replCoord->getMyLastOptime();
_lastFetchedHash = lastFetchedHash;
LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash;
}
bool BackgroundSync::isPaused() const {
stdx::lock_guard lock(_mutex);
return _pause;
}
void BackgroundSync::waitUntilPaused() {
stdx::unique_lock lock(_mutex);
while (!_pause) {
_pausedCondition.wait(lock);
}
}
void BackgroundSync::clearBuffer() {
_buffer.clear();
}
long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) {
BSONObj oplogEntry;
try {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ScopedTransaction transaction(txn, MODE_IX);
Lock::DBLock lk(txn->lockState(), "local", MODE_X);
bool success = Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry);
if (!success) {
// This can happen when we are to do an initial sync. lastHash will be set
// after the initial sync is complete.
return 0;
}
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsOplogName);
} catch (const DBException& ex) {
severe() << "Problem reading " << rsOplogName << ": " << ex.toStatus();
fassertFailed(18904);
}
BSONElement hashElement = oplogEntry[hashFieldName];
if (hashElement.eoo()) {
severe() << "Most recent entry in " << rsOplogName << " missing \"" << hashFieldName
<< "\" field";
fassertFailed(18902);
}
if (hashElement.type() != NumberLong) {
severe() << "Expected type of \"" << hashFieldName << "\" in most recent " << rsOplogName
<< " entry to have type NumberLong, but found " << typeName(hashElement.type());
fassertFailed(18903);
}
return hashElement.safeNumberLong();
}
bool BackgroundSync::getInitialSyncRequestedFlag() {
stdx::lock_guard lock(_initialSyncMutex);
return _initialSyncRequestedFlag;
}
void BackgroundSync::setInitialSyncRequestedFlag(bool value) {
stdx::lock_guard lock(_initialSyncMutex);
_initialSyncRequestedFlag = value;
}
void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) {
stdx::lock_guard lock(_mutex);
_buffer.push(op);
}
} // namespace repl
} // namespace mongo