/**
* Copyright (C) 2012 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/base/counter.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/repl_coordinator_impl.h"
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
namespace repl {
int SleepToAllowBatchingMillis = 2;
const int BatchIsSmallish = 40000; // bytes
MONGO_FP_DECLARE(rsBgSyncProduce);
BackgroundSync* BackgroundSync::s_instance = 0;
boost::mutex BackgroundSync::s_mutex;
//The number and time spent reading batches off the network
static TimerStats getmoreReplStats;
static ServerStatusMetricField displayBatchesRecieved(
"repl.network.getmores",
&getmoreReplStats );
//The oplog entries read via the oplog reader
static Counter64 opsReadStats;
static ServerStatusMetricField displayOpsRead( "repl.network.ops",
&opsReadStats );
//The bytes read via the oplog reader
static Counter64 networkByteStats;
static ServerStatusMetricField displayBytesRead( "repl.network.bytes",
&networkByteStats );
//The count of items in the buffer
static Counter64 bufferCountGauge;
static ServerStatusMetricField displayBufferCount( "repl.buffer.count",
&bufferCountGauge );
//The size (bytes) of items in the buffer
static Counter64 bufferSizeGauge;
static ServerStatusMetricField displayBufferSize( "repl.buffer.sizeBytes",
&bufferSizeGauge );
//The max size (bytes) of the buffer
static int bufferMaxSizeGauge = 256*1024*1024;
static ServerStatusMetricField displayBufferMaxSize( "repl.buffer.maxSizeBytes",
&bufferMaxSizeGauge );
BackgroundSyncInterface::~BackgroundSyncInterface() {}
size_t getSize(const BSONObj& o) {
// SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion
return static_cast(o.objsize());
}
BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize),
_lastOpTimeFetched(0, 0),
_lastH(0),
_pause(true),
_appliedBuffer(true),
_assumingPrimary(false),
_currentSyncTarget(NULL),
_replCoord(getGlobalReplicationCoordinator()) {
}
BackgroundSync* BackgroundSync::get() {
boost::unique_lock lock(s_mutex);
if (s_instance == NULL && !inShutdown()) {
s_instance = new BackgroundSync();
}
return s_instance;
}
void BackgroundSync::shutdown() {
notify();
}
void BackgroundSync::notify() {
{
boost::unique_lock lock(s_instance->_mutex);
// If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting)
if (s_instance->_buffer.empty()) {
s_instance->_appliedBuffer = true;
s_instance->_condvar.notify_all();
}
}
}
void BackgroundSync::producerThread() {
Client::initThread("rsBackgroundSync");
replLocalAuth();
while (!inShutdown()) {
if (!theReplSet) {
log() << "replSet warning did not receive a valid config yet, sleeping 20 seconds " << rsLog;
sleepsecs(20);
continue;
}
try {
_producerThread();
}
catch (const DBException& e) {
sethbmsg(str::stream() << "sync source problem: " << e.toString());
}
catch (const std::exception& e2) {
sethbmsg(str::stream() << "exception in producer: " << e2.what());
sleepsecs(60);
}
}
cc().shutdown();
}
void BackgroundSync::_producerThread() {
OperationContextImpl txn;
MemberState state = theReplSet->state();
// we want to pause when the state changes to primary
if (isAssumingPrimary() || state.primary()) {
if (!_pause) {
stop();
}
sleepsecs(1);
return;
}
if (state.fatal() || state.startup()) {
sleepsecs(5);
return;
}
// if this member has an empty oplog, we cannot start syncing
if (theReplSet->lastOpTimeWritten.isNull()) {
sleepsecs(1);
return;
}
// we want to unpause when we're no longer primary
// start() also loads _lastOpTimeFetched, which we know is set from the "if"
else if (_pause) {
start();
}
produce(&txn);
}
void BackgroundSync::produce(OperationContext* txn) {
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
OplogReader r;
{
boost::unique_lock lock(_mutex);
if (_lastOpTimeFetched.isNull()) {
// then we're initial syncing and we're still waiting for this to be set
_currentSyncTarget = NULL;
lock.unlock();
sleepsecs(1);
// if there is no one to sync from
return;
}
// Wait until we've applied the ops we have before we choose a sync target
while (!_appliedBuffer) {
_condvar.wait(lock);
}
}
while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
sleepmillis(0);
}
// find a target to sync from the last op time written
_replCoord->connectOplogReader(txn, this, &r);
OpTime lastOpTimeFetched;
{
boost::unique_lock lock(_mutex);
// no server found
if (_currentSyncTarget == NULL) {
lock.unlock();
sleepsecs(1);
// if there is no one to sync from
return;
}
lastOpTimeFetched = _lastOpTimeFetched;
}
r.tailingQueryGTE(rsoplog, lastOpTimeFetched);
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
if (!r.haveCursor()) {
return;
}
if (isRollbackRequired(txn, r)) {
stop();
return;
}
while (!inShutdown()) {
if (!r.moreInCurrentBatch()) {
// Check some things periodically
// (whenever we run out of items in the
// current cursor batch)
int bs = r.currentBatchMessageSize();
if( bs > 0 && bs < BatchIsSmallish ) {
// on a very low latency network, if we don't wait a little, we'll be
// getting ops to write almost one at a time. this will both be expensive
// for the upstream server as well as potentially defeating our parallel
// application of batches on the secondary.
//
// the inference here is basically if the batch is really small, we are
// "caught up".
//
sleepmillis(SleepToAllowBatchingMillis);
}
if (theReplSet->gotForceSync()) {
return;
}
// If we are transitioning to primary state, we need to leave
// this loop in order to go into bgsync-pause mode.
if (isAssumingPrimary() || theReplSet->isPrimary()) {
return;
}
// re-evaluate quality of sync target
if (shouldChangeSyncTarget()) {
return;
}
{
//record time for each getmore
TimerHolder batchTimer(&getmoreReplStats);
// This calls receiveMore() on the oplogreader cursor.
// It can wait up to five seconds for more data.
r.more();
}
networkByteStats.increment(r.currentBatchMessageSize());
if (!r.moreInCurrentBatch()) {
// If there is still no data from upstream, check a few more things
// and then loop back for another pass at getting more data
{
boost::unique_lock lock(_mutex);
if (_pause ||
!_currentSyncTarget ||
!_currentSyncTarget->hbinfo().hbstate.readable()) {
return;
}
}
r.tailCheck();
if( !r.haveCursor() ) {
LOG(1) << "replSet end syncTail pass" << rsLog;
return;
}
continue;
}
}
// At this point, we are guaranteed to have at least one thing to read out
// of the oplogreader cursor.
BSONObj o = r.nextSafe().getOwned();
opsReadStats.increment();
{
boost::unique_lock lock(_mutex);
_appliedBuffer = false;
}
OCCASIONALLY {
LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes" << rsLog;
}
// the blocking queue will wait (forever) until there's room for us to push
_buffer.push(o);
bufferCountGauge.increment();
bufferSizeGauge.increment(getSize(o));
{
boost::unique_lock lock(_mutex);
_lastH = o["h"].numberLong();
_lastOpTimeFetched = o["ts"]._opTime();
LOG(3) << "replSet lastOpTimeFetched: "
<< _lastOpTimeFetched.toStringPretty() << rsLog;
}
}
}
bool BackgroundSync::shouldChangeSyncTarget() {
boost::unique_lock lock(_mutex);
// is it even still around?
if (!_currentSyncTarget || !_currentSyncTarget->hbinfo().hbstate.readable()) {
return true;
}
// check other members: is any member's optime more than 30 seconds ahead of the guy we're
// syncing from?
return theReplSet->shouldChangeSyncTarget(_currentSyncTarget->hbinfo().opTime);
}
bool BackgroundSync::peek(BSONObj* op) {
return _buffer.peek(*op);
}
void BackgroundSync::waitForMore() {
BSONObj op;
// Block for one second before timing out.
// Ignore the value of the op we peeked at.
_buffer.blockingPeek(op, 1);
}
void BackgroundSync::consume() {
// this is just to get the op off the queue, it's been peeked at
// and queued for application already
BSONObj op = _buffer.blockingPop();
bufferCountGauge.decrement(1);
bufferSizeGauge.decrement(getSize(op));
}
bool BackgroundSync::isStale(OplogReader& r, BSONObj& remoteOldestOp) {
remoteOldestOp = r.findOne(rsoplog, Query());
OpTime remoteTs = remoteOldestOp["ts"]._opTime();
DEV {
log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog;
log() << "replSet lastOpTimeFetched: " << _lastOpTimeFetched.toStringLong() << rsLog;
}
{
boost::unique_lock lock(_mutex);
if (_lastOpTimeFetched >= remoteTs) {
return false;
}
}
return true;
}
void BackgroundSync::getOplogReaderLegacy(OperationContext* txn, OplogReader* r) {
const Member *target = NULL, *stale = NULL;
BSONObj oldest;
verify(r->conn() == NULL);
while ((target = theReplSet->getMemberToSyncTo()) != NULL) {
string current = target->fullName();
if (!r->connect(target->h())) {
LOG(2) << "replSet can't connect to " << current << " to read operations" << rsLog;
r->resetConnection();
theReplSet->veto(current);
sleepsecs(1);
continue;
}
if (isStale(*r, oldest)) {
r->resetConnection();
theReplSet->veto(current, 600);
stale = target;
continue;
}
// if we made it here, the target is up and not stale
{
boost::unique_lock lock(_mutex);
// this will trigger the syncSourceFeedback
_currentSyncTarget = target;
}
return;
}
// the only viable sync target was stale
if (stale) {
theReplSet->goStale(txn, stale, oldest);
sleepsecs(120);
}
{
boost::unique_lock lock(_mutex);
_currentSyncTarget = NULL;
}
}
void BackgroundSync::connectOplogReader(OperationContext* txn,
ReplicationCoordinatorImpl* replCoordImpl,
OplogReader* reader) {
OpTime lastOpTimeFetched;
{
boost::unique_lock lock(_mutex);
lastOpTimeFetched = _lastOpTimeFetched;
}
Date_t now(curTimeMillis64());
OpTime oldestOpTimeSeen(now,0);
while (true) {
HostAndPort candidate = replCoordImpl->chooseNewSyncSource();
if (candidate.empty()) {
if (oldestOpTimeSeen == OpTime(now,0)) {
// If, in this invocation of connectOplogReader(), we did not successfully
// connect to any node ahead of us,
// we apparently have no sync sources to connect to.
// This situation is common; e.g. if there are no writes to the primary at
// the moment.
return;
}
// Connected to at least one member, but in all cases we were too stale to use them
// as a sync source.
log() << "replSet error RS102 too stale to catch up" << rsLog;
log() << "replSet our last optime : " << lastOpTimeFetched.toStringLong() << rsLog;
log() << "replSet oldest available is " << oldestOpTimeSeen.toStringLong() <<
rsLog;
log() << "replSet "
"See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"
<< rsLog;
sethbmsg("error RS102 too stale to catch up");
theReplSet->setMinValid(txn, oldestOpTimeSeen);
replCoordImpl->setFollowerMode(MemberState::RS_RECOVERING);
return;
}
if (!reader->connect(candidate)) {
LOG(2) << "replSet can't connect to " << candidate.toString() <<
" to read operations" << rsLog;
reader->resetConnection();
replCoordImpl->blacklistSyncSource(candidate, Date_t(curTimeMillis64() + 10*1000));
continue;
}
// Read the first (oldest) op and confirm that it's not newer than our last
// fetched op. Otherwise, we have fallen off the back of that source's oplog.
BSONObj remoteOldestOp(reader->findOne(rsoplog, Query()));
BSONElement tsElem(remoteOldestOp["ts"]);
if (tsElem.type() != Timestamp) {
// This member's got a bad op in its oplog.
warning() << "oplog invalid format on node " << candidate.toString();
reader->resetConnection();
replCoordImpl->blacklistSyncSource(candidate,
Date_t(curTimeMillis64() + 600*1000));
continue;
}
OpTime remoteOldOpTime = tsElem._opTime();
if (lastOpTimeFetched < remoteOldOpTime) {
// We're too stale to use this sync source.
reader->resetConnection();
replCoordImpl->blacklistSyncSource(candidate,
Date_t(curTimeMillis64() + 600*1000));
if (oldestOpTimeSeen > remoteOldOpTime) {
warning() << "we are too stale to use " << candidate.toString() <<
" as a sync source";
oldestOpTimeSeen = remoteOldOpTime;
}
continue;
}
// Got a valid sync source.
return;
} // while (true)
}
bool BackgroundSync::isRollbackRequired(OperationContext* txn, OplogReader& r) {
string hn = r.conn()->getServerAddress();
if (!r.more()) {
try {
BSONObj theirLastOp = r.getLastOp(rsoplog);
if (theirLastOp.isEmpty()) {
log() << "replSet error empty query result from " << hn << " oplog" << rsLog;
sleepsecs(2);
return true;
}
OpTime theirTS = theirLastOp["ts"]._opTime();
if (theirTS < _lastOpTimeFetched) {
log() << "replSet we are ahead of the sync source, will try to roll back"
<< rsLog;
theReplSet->syncRollback(txn, r);
return true;
}
/* we're not ahead? maybe our new query got fresher data. best to come back and try again */
log() << "replSet syncTail condition 1" << rsLog;
sleepsecs(1);
}
catch(DBException& e) {
log() << "replSet error querying " << hn << ' ' << e.toString() << rsLog;
sleepsecs(2);
}
return true;
}
BSONObj o = r.nextSafe();
OpTime ts = o["ts"]._opTime();
long long h = o["h"].numberLong();
if( ts != _lastOpTimeFetched || h != _lastH ) {
log() << "replSet our last op time fetched: " << _lastOpTimeFetched.toStringPretty() << rsLog;
log() << "replset source's GTE: " << ts.toStringPretty() << rsLog;
theReplSet->syncRollback(txn, r);
return true;
}
return false;
}
const Member* BackgroundSync::getSyncTarget() {
boost::unique_lock lock(_mutex);
return _currentSyncTarget;
}
void BackgroundSync::stop() {
boost::unique_lock lock(_mutex);
_pause = true;
_currentSyncTarget = NULL;
_lastOpTimeFetched = OpTime(0,0);
_lastH = 0;
_condvar.notify_all();
}
void BackgroundSync::start() {
massert(16235, "going to start syncing, but buffer is not empty", _buffer.empty());
boost::unique_lock lock(_mutex);
_pause = false;
// reset _last fields with current data
_lastOpTimeFetched = theReplSet->lastOpTimeWritten;
_lastH = theReplSet->lastH;
LOG(1) << "replset bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastH << rsLog;
}
bool BackgroundSync::isAssumingPrimary() {
boost::unique_lock lck(_mutex);
return _assumingPrimary;
}
void BackgroundSync::stopReplicationAndFlushBuffer() {
boost::unique_lock lck(_mutex);
// 1. Tell syncing to stop
_assumingPrimary = true;
// 2. Wait for syncing to stop and buffer to be applied
while (!(_pause && _appliedBuffer)) {
_condvar.wait(lck);
}
// 3. Now actually become primary
_assumingPrimary = false;
}
} // namespace repl
} // namespace mongo