summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2014-09-23 14:46:55 -0400
committerEric Milkie <milkie@10gen.com>2014-09-26 11:27:06 -0400
commite86e08deff7293b5778fad27df9031c013595b12 (patch)
tree9c24931717b261980a0591ab40192cbac9d101ce /src/mongo
parent128ef4c4bcf312fbe6339181e377d12744165cf9 (diff)
downloadmongo-e86e08deff7293b5778fad27df9031c013595b12.tar.gz
SERVER-15089 Add new Applier class and remove theReplSet references from BackgroundSync
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/bgsync.cpp307
-rw-r--r--src/mongo/db/repl/bgsync.h62
-rw-r--r--src/mongo/db/repl/health.cpp6
-rw-r--r--src/mongo/db/repl/heartbeat.cpp4
-rw-r--r--src/mongo/db/repl/oplog.cpp73
-rw-r--r--src/mongo/db/repl/oplogreader.cpp81
-rw-r--r--src/mongo/db/repl/oplogreader.h15
-rw-r--r--src/mongo/db/repl/repl_coordinator.h33
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state.h14
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_impl.cpp21
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_impl.h2
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_mock.cpp16
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_mock.h6
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.cpp58
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.h12
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp61
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.h39
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.cpp41
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.h12
-rw-r--r--src/mongo/db/repl/repl_coordinator_mock.cpp21
-rw-r--r--src/mongo/db/repl/repl_coordinator_mock.h12
-rw-r--r--src/mongo/db/repl/repl_set.h1
-rw-r--r--src/mongo/db/repl/repl_set_impl.cpp125
-rw-r--r--src/mongo/db/repl/repl_set_impl.h19
-rw-r--r--src/mongo/db/repl/rs.cpp4
-rw-r--r--src/mongo/db/repl/rs_config.cpp2
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp378
-rw-r--r--src/mongo/db/repl/rs_initialsync.h39
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp9
-rw-r--r--src/mongo/db/repl/rs_sync.cpp80
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp10
-rw-r--r--src/mongo/db/repl/sync_source_feedback.h6
-rw-r--r--src/mongo/db/repl/sync_tail.cpp37
-rw-r--r--src/mongo/db/repl/sync_tail.h1
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp2
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp2
36 files changed, 824 insertions, 787 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 1d85caacdd1..68557c4056f 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -38,7 +38,6 @@
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
-#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/repl_coordinator_impl.h"
@@ -54,8 +53,11 @@ namespace mongo {
namespace repl {
+namespace {
+ const char hashFieldName[] = "h";
int SleepToAllowBatchingMillis = 2;
const int BatchIsSmallish = 40000; // bytes
+} // namespace
MONGO_FP_DECLARE(rsBgSyncProduce);
@@ -98,12 +100,12 @@ namespace repl {
}
BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize),
- _lastOpTimeFetched(0, 0),
- _lastH(0),
+ _lastOpTimeFetched(std::numeric_limits<int>::max(),
+ 0),
+ _lastHash(0),
_pause(true),
_appliedBuffer(true),
_assumingPrimary(false),
- _currentSyncTarget(NULL),
_replCoord(getGlobalReplicationCoordinator()) {
}
@@ -120,14 +122,12 @@ namespace repl {
}
void BackgroundSync::notify() {
- {
- boost::unique_lock<boost::mutex> lock(s_instance->_mutex);
+ boost::lock_guard<boost::mutex> lock(_mutex);
- // If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting)
- if (s_instance->_buffer.empty()) {
- s_instance->_appliedBuffer = true;
- s_instance->_condvar.notify_all();
- }
+ // If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting)
+ if (_buffer.empty()) {
+ _appliedBuffer = true;
+ _condvar.notify_all();
}
}
@@ -152,8 +152,6 @@ namespace repl {
}
void BackgroundSync::_producerThread() {
- OperationContextImpl txn;
-
MemberState state = theReplSet->state();
// we want to pause when the state changes to primary
@@ -170,10 +168,9 @@ namespace repl {
return;
}
- // if this member has an empty oplog, we cannot start syncing
- // Note: This logic is insane, but I will keep it here because if we can't
- // connect the oplogreader for initial sync, it will be unlikely that we can connect
- // the BGSync oplogreader.
+ OperationContextImpl txn;
+
+ // We need to wait until initial sync has started.
if (_replCoord->getMyLastOptime().isNull()) {
sleepsecs(1);
return;
@@ -181,7 +178,7 @@ namespace repl {
// we want to unpause when we're no longer primary
// start() also loads _lastOpTimeFetched, which we know is set from the "if"
else if (_pause) {
- start();
+ start(&txn);
}
produce(&txn);
@@ -190,13 +187,10 @@ namespace repl {
void BackgroundSync::produce(OperationContext* txn) {
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
- OplogReader r;
-
{
boost::unique_lock<boost::mutex> lock(_mutex);
if (_lastOpTimeFetched.isNull()) {
// then we're initial syncing and we're still waiting for this to be set
- _currentSyncTarget = NULL;
lock.unlock();
sleepsecs(1);
// if there is no one to sync from
@@ -214,43 +208,49 @@ namespace repl {
}
- // find a target to sync from the last op time written
- _replCoord->connectOplogReader(txn, this, &r);
-
+ // find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ lastOpTimeFetched = _lastOpTimeFetched;
+ _syncSourceHost = HostAndPort();
+ }
+ _syncSourceReader.resetConnection();
+ _syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord);
{
boost::unique_lock<boost::mutex> lock(_mutex);
// no server found
- if (_currentSyncTarget == NULL) {
+ if (_syncSourceReader.getHost().empty()) {
lock.unlock();
sleepsecs(1);
// if there is no one to sync from
return;
}
lastOpTimeFetched = _lastOpTimeFetched;
+ _syncSourceHost = _syncSourceReader.getHost();
}
- r.tailingQueryGTE(rsoplog, lastOpTimeFetched);
+ _syncSourceReader.tailingQueryGTE(rsoplog, lastOpTimeFetched);
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
- if (!r.haveCursor()) {
+ if (!_syncSourceReader.haveCursor()) {
return;
}
- if (_rollbackIfNeeded(txn, r)) {
+ if (_rollbackIfNeeded(txn, _syncSourceReader)) {
stop();
return;
}
while (!inShutdown()) {
- if (!r.moreInCurrentBatch()) {
+ if (!_syncSourceReader.moreInCurrentBatch()) {
// Check some things periodically
// (whenever we run out of items in the
// current cursor batch)
- int bs = r.currentBatchMessageSize();
+ int bs = _syncSourceReader.currentBatchMessageSize();
if( bs > 0 && bs < BatchIsSmallish ) {
// on a very low latency network, if we don't wait a little, we'll be
// getting ops to write almost one at a time. this will both be expensive
@@ -273,35 +273,32 @@ namespace repl {
}
// re-evaluate quality of sync target
- if (shouldChangeSyncTarget()) {
+ if (shouldChangeSyncSource()) {
return;
}
-
{
//record time for each getmore
TimerHolder batchTimer(&getmoreReplStats);
// This calls receiveMore() on the oplogreader cursor.
// It can wait up to five seconds for more data.
- r.more();
+ _syncSourceReader.more();
}
- networkByteStats.increment(r.currentBatchMessageSize());
+ networkByteStats.increment(_syncSourceReader.currentBatchMessageSize());
- if (!r.moreInCurrentBatch()) {
+ if (!_syncSourceReader.moreInCurrentBatch()) {
// If there is still no data from upstream, check a few more things
// and then loop back for another pass at getting more data
{
boost::unique_lock<boost::mutex> lock(_mutex);
- if (_pause ||
- !_currentSyncTarget ||
- !_currentSyncTarget->hbinfo().hbstate.readable()) {
+ if (_pause) {
return;
}
}
- r.tailCheck();
- if( !r.haveCursor() ) {
+ _syncSourceReader.tailCheck();
+ if( !_syncSourceReader.haveCursor() ) {
LOG(1) << "replSet end syncTail pass" << rsLog;
return;
}
@@ -312,7 +309,7 @@ namespace repl {
// At this point, we are guaranteed to have at least one thing to read out
// of the oplogreader cursor.
- BSONObj o = r.nextSafe().getOwned();
+ BSONObj o = _syncSourceReader.nextSafe().getOwned();
opsReadStats.increment();
{
@@ -330,7 +327,7 @@ namespace repl {
{
boost::unique_lock<boost::mutex> lock(_mutex);
- _lastH = o["h"].numberLong();
+ _lastHash = o["h"].numberLong();
_lastOpTimeFetched = o["ts"]._opTime();
LOG(3) << "replSet lastOpTimeFetched: "
<< _lastOpTimeFetched.toStringPretty() << rsLog;
@@ -338,17 +335,15 @@ namespace repl {
}
}
- bool BackgroundSync::shouldChangeSyncTarget() {
- boost::unique_lock<boost::mutex> lock(_mutex);
-
+ bool BackgroundSync::shouldChangeSyncSource() {
// is it even still around?
- if (!_currentSyncTarget || !_currentSyncTarget->hbinfo().hbstate.readable()) {
+ if (_syncSourceReader.getHost().empty()) {
return true;
}
- // check other members: is any member's optime more than 30 seconds ahead of the guy we're
- // syncing from?
- return theReplSet->shouldChangeSyncTarget(_currentSyncTarget->hbinfo().opTime);
+ // check other members: is any member's optime more than MaxSyncSourceLag seconds
+ // ahead of the current sync source?
+ return _replCoord->shouldChangeSyncSource(_syncSourceReader.getHost());
}
@@ -371,151 +366,24 @@ namespace repl {
bufferSizeGauge.decrement(getSize(op));
}
- bool BackgroundSync::isStale(OplogReader& r, BSONObj& remoteOldestOp) {
+ bool BackgroundSync::isStale(OpTime lastOpTimeFetched,
+ OplogReader& r,
+ BSONObj& remoteOldestOp) {
remoteOldestOp = r.findOne(rsoplog, Query());
OpTime remoteTs = remoteOldestOp["ts"]._opTime();
- DEV {
- log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog;
- log() << "replSet lastOpTimeFetched: " << _lastOpTimeFetched.toStringLong() << rsLog;
- }
-
{
boost::unique_lock<boost::mutex> lock(_mutex);
- if (_lastOpTimeFetched >= remoteTs) {
+ if (lastOpTimeFetched >= remoteTs) {
return false;
}
+ log() << "replSet remoteOldestOp: " << remoteTs.toStringLong() << rsLog;
+ log() << "replSet lastOpTimeFetched: " << lastOpTimeFetched.toStringLong() << rsLog;
}
return true;
}
- void BackgroundSync::getOplogReaderLegacy(OperationContext* txn, OplogReader* r) {
- const Member *target = NULL, *stale = NULL;
- BSONObj oldest;
-
- verify(r->conn() == NULL);
-
- while ((target = theReplSet->getMemberToSyncTo()) != NULL) {
- string current = target->fullName();
-
- if (!r->connect(target->h())) {
- LOG(2) << "replSet can't connect to " << current << " to read operations" << rsLog;
- r->resetConnection();
- theReplSet->veto(current);
- sleepsecs(1);
- continue;
- }
-
- if (isStale(*r, oldest)) {
- r->resetConnection();
- theReplSet->veto(current, 600);
- stale = target;
- continue;
- }
-
- // if we made it here, the target is up and not stale
- {
- boost::unique_lock<boost::mutex> lock(_mutex);
- // this will trigger the syncSourceFeedback
- _currentSyncTarget = target;
- }
-
- return;
- }
-
- // the only viable sync target was stale
- if (stale) {
- theReplSet->goStale(txn, stale, oldest);
- sleepsecs(120);
- }
-
- {
- boost::unique_lock<boost::mutex> lock(_mutex);
- _currentSyncTarget = NULL;
- }
-
- }
-
- void BackgroundSync::connectOplogReader(OperationContext* txn,
- ReplicationCoordinatorImpl* replCoordImpl,
- OplogReader* reader) {
- OpTime lastOpTimeFetched;
- {
- boost::unique_lock<boost::mutex> lock(_mutex);
- lastOpTimeFetched = _lastOpTimeFetched;
- }
- const OpTime sentinel(Milliseconds(curTimeMillis64()).total_seconds(), 0);
- OpTime oldestOpTimeSeen = sentinel;
-
- while (true) {
- HostAndPort candidate = replCoordImpl->chooseNewSyncSource();
-
- if (candidate.empty()) {
- if (oldestOpTimeSeen == sentinel) {
- // If, in this invocation of connectOplogReader(), we did not successfully
- // connect to any node ahead of us,
- // we apparently have no sync sources to connect to.
- // This situation is common; e.g. if there are no writes to the primary at
- // the moment.
- return;
- }
-
- // Connected to at least one member, but in all cases we were too stale to use them
- // as a sync source.
- log() << "replSet error RS102 too stale to catch up" << rsLog;
- log() << "replSet our last optime : " << lastOpTimeFetched.toStringLong() << rsLog;
- log() << "replSet oldest available is " << oldestOpTimeSeen.toStringLong() <<
- rsLog;
- log() << "replSet "
- "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"
- << rsLog;
- sethbmsg("error RS102 too stale to catch up");
- setMinValid(txn, oldestOpTimeSeen);
- replCoordImpl->setFollowerMode(MemberState::RS_RECOVERING);
- return;
- }
-
- if (!reader->connect(candidate)) {
- LOG(2) << "replSet can't connect to " << candidate.toString() <<
- " to read operations" << rsLog;
- reader->resetConnection();
- replCoordImpl->blacklistSyncSource(candidate, Date_t(curTimeMillis64() + 10*1000));
- continue;
- }
- // Read the first (oldest) op and confirm that it's not newer than our last
- // fetched op. Otherwise, we have fallen off the back of that source's oplog.
- BSONObj remoteOldestOp(reader->findOne(rsoplog, Query()));
- BSONElement tsElem(remoteOldestOp["ts"]);
- if (tsElem.type() != Timestamp) {
- // This member's got a bad op in its oplog.
- warning() << "oplog invalid format on node " << candidate.toString();
- reader->resetConnection();
- replCoordImpl->blacklistSyncSource(candidate,
- Date_t(curTimeMillis64() + 600*1000));
- continue;
- }
- OpTime remoteOldOpTime = tsElem._opTime();
-
- if (lastOpTimeFetched < remoteOldOpTime) {
- // We're too stale to use this sync source.
- reader->resetConnection();
- replCoordImpl->blacklistSyncSource(candidate,
- Date_t(curTimeMillis64() + 600*1000));
- if (oldestOpTimeSeen > remoteOldOpTime) {
- warning() << "we are too stale to use " << candidate.toString() <<
- " as a sync source";
- oldestOpTimeSeen = remoteOldOpTime;
- }
- continue;
- }
-
- // Got a valid sync source.
- return;
- } // while (true)
-
- }
-
bool BackgroundSync::_rollbackIfNeeded(OperationContext* txn, OplogReader& r) {
string hn = r.conn()->getServerAddress();
@@ -547,8 +415,8 @@ namespace repl {
BSONObj o = r.nextSafe();
OpTime ts = o["ts"]._opTime();
- long long h = o["h"].numberLong();
- if( ts != _lastOpTimeFetched || h != _lastH ) {
+ long long hash = o["h"].numberLong();
+ if( ts != _lastOpTimeFetched || hash != _lastHash ) {
log() << "replSet our last op time fetched: " << _lastOpTimeFetched.toStringPretty() << rsLog;
log() << "replset source's GTE: " << ts.toStringPretty() << rsLog;
syncRollback(txn, _replCoord->getMyLastOptime(), &r, _replCoord);
@@ -558,27 +426,29 @@ namespace repl {
return false;
}
- const Member* BackgroundSync::getSyncTarget() {
+ HostAndPort BackgroundSync::getSyncTarget() {
boost::unique_lock<boost::mutex> lock(_mutex);
- return _currentSyncTarget;
+ return _syncSourceHost;
}
void BackgroundSync::clearSyncTarget() {
boost::unique_lock<boost::mutex> lock(_mutex);
- _currentSyncTarget = NULL;
+ _syncSourceReader.resetConnection();
+ _syncSourceHost = HostAndPort();
}
void BackgroundSync::stop() {
boost::unique_lock<boost::mutex> lock(_mutex);
_pause = true;
- _currentSyncTarget = NULL;
+ _syncSourceReader.resetConnection();
+ _syncSourceHost = HostAndPort();
_lastOpTimeFetched = OpTime(0,0);
- _lastH = 0;
+ _lastHash = 0;
_condvar.notify_all();
}
- void BackgroundSync::start() {
+ void BackgroundSync::start(OperationContext* txn) {
massert(16235, "going to start syncing, but buffer is not empty", _buffer.empty());
boost::unique_lock<boost::mutex> lock(_mutex);
@@ -586,9 +456,11 @@ namespace repl {
// reset _last fields with current data
_lastOpTimeFetched = _replCoord->getMyLastOptime();
- _lastH = theReplSet->lastH;
- LOG(1) << "replset bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastH << rsLog;
+ loadLastHash(txn);
+
+ LOG(1) << "replset bgsync fetch queue set to: " << _lastOpTimeFetched <<
+ " " << _lastHash << rsLog;
}
bool BackgroundSync::isAssumingPrimary() {
@@ -611,5 +483,56 @@ namespace repl {
_assumingPrimary = false;
}
+ long long BackgroundSync::getLastHash() const {
+ boost::lock_guard<boost::mutex> lck(_mutex);
+ return _lastHash;
+ }
+
+ void BackgroundSync::setLastHash(long long newHash) {
+ boost::lock_guard<boost::mutex> lck(_mutex);
+ _lastHash = newHash;
+ }
+
+ void BackgroundSync::loadLastHash(OperationContext* txn) {
+ Lock::DBRead lk(txn->lockState(), rsoplog);
+ BSONObj oplogEntry;
+ try {
+ if (!Helpers::getLast(txn, rsoplog, oplogEntry)) {
+ // This can happen when we are to do an initial sync. lastHash will be set
+ // after the initial sync is complete.
+ _lastHash = 0;
+ return;
+ }
+ }
+ catch (const DBException& ex) {
+ severe() << "Problem reading " << rsoplog << ": " << ex.toStatus();
+ fassertFailed(18904);
+ }
+ BSONElement hashElement = oplogEntry[hashFieldName];
+ if (hashElement.eoo()) {
+ severe() << "Most recent entry in " << rsoplog << " missing \"" << hashFieldName <<
+ "\" field";
+ fassertFailed(18902);
+ }
+ if (hashElement.type() != NumberLong) {
+ severe() << "Expected type of \"" << hashFieldName << "\" in most recent " <<
+ rsoplog << " entry to have type NumberLong, but found " <<
+ typeName(hashElement.type());
+ fassertFailed(18903);
+ }
+ _lastHash = hashElement.safeNumberLong();
+ }
+
+ bool BackgroundSync::getInitialSyncRequestedFlag() {
+ boost::lock_guard<boost::mutex> lock(_initialSyncMutex);
+ return _initialSyncRequestedFlag;
+ }
+
+ void BackgroundSync::setInitialSyncRequestedFlag(bool value) {
+ boost::lock_guard<boost::mutex> lock(_initialSyncMutex);
+ _initialSyncRequestedFlag = value;
+ }
+
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index eb88040f64c..45f87731200 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -39,7 +39,6 @@ namespace repl {
class Member;
class ReplicationCoordinator;
- class ReplicationCoordinatorImpl;
// This interface exists to facilitate easier testing;
// the test infrastructure implements these functions with stubs.
@@ -56,12 +55,6 @@ namespace repl {
// called by sync thread after it has applied an op
virtual void consume() = 0;
- // Returns the member we're currently syncing from (or NULL)
- virtual const Member* getSyncTarget() = 0;
-
- // Sets the member we're currently syncing from to be NULL
- virtual void clearSyncTarget() = 0;
-
// wait up to 1 second for more ops to appear
virtual void waitForMore() = 0;
};
@@ -79,21 +72,26 @@ namespace repl {
// protects creation of s_instance
static boost::mutex s_mutex;
- // _mutex protects all of the class variables
- boost::mutex _mutex;
-
// Production thread
BlockingQueue<BSONObj> _buffer;
+ OplogReader _syncSourceReader;
+
+ // _mutex protects all of the class variables except _syncSourceReader and _buffer
+ mutable boost::mutex _mutex;
OpTime _lastOpTimeFetched;
- long long _lastH;
+
+ // hash we use to make sure we are reading the right flow of ops and aren't on
+ // an out-of-date "fork"
+ long long _lastHash;
+
// if produce thread should be running
bool _pause;
bool _appliedBuffer;
bool _assumingPrimary;
boost::condition _condvar;
- const Member* _currentSyncTarget;
+ HostAndPort _syncSourceHost;
BackgroundSync();
BackgroundSync(const BackgroundSync& s);
@@ -107,23 +105,28 @@ namespace repl {
bool _rollbackIfNeeded(OperationContext* txn, OplogReader& r);
// Evaluate if the current sync target is still good
- bool shouldChangeSyncTarget();
+ bool shouldChangeSyncSource();
// check lastOpTimeWritten against the remote's earliest op, filling in remoteOldestOp.
- bool isStale(OplogReader& r, BSONObj& remoteOldestOp);
- // stop syncing when this becomes a primary
- void stop();
+ bool isStale(OpTime lastOpTimeFetched, OplogReader& r, BSONObj& remoteOldestOp);
// restart syncing
- void start();
+ void start(OperationContext* txn);
// A pointer to the replication coordinator running the show.
ReplicationCoordinator* _replCoord;
+ // bool for indicating resync need on this node and the mutex that protects it
+ // The resync command sets this flag; the Applier thread observes and clears it.
+ bool _initialSyncRequestedFlag;
+ boost::mutex _initialSyncMutex;
+
public:
+ // stop syncing (when this node becomes a primary, e.g.)
+ void stop();
bool isAssumingPrimary();
static BackgroundSync* get();
- static void shutdown();
- static void notify();
+ void shutdown();
+ void notify();
virtual ~BackgroundSync() {}
@@ -132,11 +135,12 @@ namespace repl {
// starts the sync target notifying thread
void notifierThread();
+ HostAndPort getSyncTarget();
+
// Interface implementation
virtual bool peek(BSONObj* op);
virtual void consume();
- virtual const Member* getSyncTarget();
virtual void clearSyncTarget();
virtual void waitForMore();
@@ -147,18 +151,12 @@ namespace repl {
// primary.
void stopReplicationAndFlushBuffer();
- /**
- * Connects an oplog reader to a viable sync source. Legacy uses getOplogReaderLegacy(),
- * which sets _currentSyncTarget as a side effect.
- * connectOplogReader() is used in new replication.
- * Both functions can affect the TopoCoord's blacklist of sync sources, and may set
- * our minValid value, durably, if we detect we haven fallen off the back of all sync
- * sources' oplogs.
- **/
- void getOplogReaderLegacy(OperationContext* txn, OplogReader* reader);
- void connectOplogReader(OperationContext* txn,
- ReplicationCoordinatorImpl* replCoordImpl,
- OplogReader* reader);
+ long long getLastHash() const;
+ void setLastHash(long long oldH);
+ void loadLastHash(OperationContext* txn);
+
+ bool getInitialSyncRequestedFlag();
+ void setInitialSyncRequestedFlag(bool value);
};
diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp
index 76bdedd97d7..5be259383d6 100644
--- a/src/mongo/db/repl/health.cpp
+++ b/src/mongo/db/repl/health.cpp
@@ -448,11 +448,11 @@ namespace repl {
b.append("set", name());
b.appendTimeT("date", time(0));
b.append("myState", myState.s);
- const Member *syncTarget = BackgroundSync::get()->getSyncTarget();
- if ( syncTarget &&
+ const HostAndPort syncTarget = BackgroundSync::get()->getSyncTarget();
+ if ( !syncTarget.empty() &&
(myState != MemberState::RS_PRIMARY) &&
(myState != MemberState::RS_REMOVED) ) {
- b.append("syncingTo", syncTarget->fullName());
+ b.append("syncingTo", syncTarget.toString());
}
b.append("members", v);
if( replSetBlind )
diff --git a/src/mongo/db/repl/heartbeat.cpp b/src/mongo/db/repl/heartbeat.cpp
index 54e05e82233..b563b4b6897 100644
--- a/src/mongo/db/repl/heartbeat.cpp
+++ b/src/mongo/db/repl/heartbeat.cpp
@@ -213,13 +213,13 @@ namespace {
if (myConfig().arbiterOnly) {
return;
}
-
+
// this ensures that will have bgsync's s_instance at all points where it is needed
// so that we needn't check for its existence
BackgroundSync* sync = BackgroundSync::get();
boost::thread t(startSyncThread);
-
+
boost::thread producer(stdx::bind(&BackgroundSync::producerThread, sync));
//boost::thread feedback(stdx::bind(&SyncSourceFeedback::run,
// &theReplSet->syncSourceFeedback));
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index f2a9eb30a64..4287580f3d1 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -55,7 +55,6 @@
#include "mongo/db/ops/delete.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/repl_coordinator_global.h"
-#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/write_concern.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/operation_context_impl.h"
@@ -126,7 +125,7 @@ namespace repl {
WriteUnitOfWork wunit(txn);
const OpTime ts = op["ts"]._opTime();
- long long h = op["h"].numberLong();
+ long long hash = op["h"].numberLong();
{
if ( localOplogRSCollection == 0 ) {
@@ -142,29 +141,20 @@ namespace repl {
Client::Context ctx(txn, rsoplog, localDB);
checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false));
- /* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy.
- this code (or code in now() maybe) should be improved.
- */
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet) {
- OpTime myLastOptime = replCoord->getMyLastOptime();
- if (!(myLastOptime < ts)) {
- warning() << "replication oplog stream went back in time. previous timestamp: "
- << myLastOptime << " newest timestamp: " << ts
- << ". attempting to sync directly from primary." << endl;
- BSONObjBuilder result;
- HostAndPort targetHostAndPort = theReplSet->box.getPrimary()->h();
- Status status = replCoord->processReplSetSyncFrom(targetHostAndPort, &result);
- if (!status.isOK()) {
- error() << "Can't sync from primary: " << status;
- }
- }
- theReplSet->lastH = h;
- ctx.getClient()->setLastOp( ts );
-
- replCoord->setMyLastOptime(txn, ts);
- BackgroundSync::notify();
+ OpTime myLastOptime = replCoord->getMyLastOptime();
+ if (!(myLastOptime < ts)) {
+ severe() << "replication oplog stream went back in time. previous timestamp: "
+ << myLastOptime << " newest timestamp: " << ts;
+ fassertFailedNoTrace(18905);
}
+
+ BackgroundSync* bgsync = BackgroundSync::get();
+ bgsync->setLastHash(hash);
+ ctx.getClient()->setLastOp( ts );
+
+ replCoord->setMyLastOptime(txn, ts);
+ bgsync->notify();
}
setNewOptime(ts);
@@ -252,26 +242,33 @@ namespace repl {
resetSlaveCache();
return;
}
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
mutex::scoped_lock lk2(newOpMutex);
OpTime ts(getNextGlobalOptime());
newOptimeNotifier.notify_all();
- long long hashNew;
- if( theReplSet ) {
- if (!theReplSet->box.getState().primary()) {
- log() << "replSet error : logOp() but not primary";
- fassertFailed(17405);
- }
- hashNew = (theReplSet->lastH * 131 + ts.asLL()) * 17 + theReplSet->selfId();
+ long long hashNew = BackgroundSync::get()->getLastHash();
+
+ // Check to make sure logOp() is legal at this point.
+ if (*opstr == 'n') {
+ // 'n' operations are always logged
+ invariant(*ns == '\0');
+
+ // 'n' operations do not advance the hash, since they are not rolled back
}
else {
- // must be initiation
- verify( *ns == 0 );
- hashNew = 0;
+ if (!replCoord->canAcceptWritesForDatabase(nsToDatabaseSubstring(ns))) {
+ severe() << "replSet error : logOp() but can't accept write to collection " << ns;
+ fassertFailed(17405);
+ }
+
+ // Advance the hash
+ hashNew = (hashNew * 131 + ts.asLL()) * 17 + replCoord->getMyId();
}
+
/* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
instead we do a single copy to the destination position in the memory mapped file.
*/
@@ -305,12 +302,10 @@ namespace repl {
OplogDocWriter writer( partial, obj );
checkOplogInsert( localOplogRSCollection->insertDocument( txn, &writer, false ) );
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) {
- theReplSet->lastH = hashNew;
- ctx.getClient()->setLastOp( ts );
- replCoord->setMyLastOptime(txn, ts);
- }
+ BackgroundSync::get()->setLastHash(hashNew);
+ ctx.getClient()->setLastOp( ts );
+ replCoord->setMyLastOptime(txn, ts);
+
wunit.commit();
}
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index 23997eabeb3..eb542669ccf 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -40,11 +40,14 @@
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/auth/security_key.h"
+#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
-#include "mongo/db/repl/rs.h" // theReplSet
+#include "mongo/db/repl/minvalid.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/repl_coordinator.h"
+#include "mongo/db/repl/rslog.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
@@ -135,5 +138,79 @@ namespace repl {
return _host;
}
+ void OplogReader::connectToSyncSource(OperationContext* txn,
+ OpTime lastOpTimeFetched,
+ ReplicationCoordinator* replCoord) {
+ const OpTime sentinel(Milliseconds(curTimeMillis64()).total_seconds(), 0);
+ OpTime oldestOpTimeSeen = sentinel;
+
+ invariant(conn() == NULL);
+
+ while (true) {
+ HostAndPort candidate = replCoord->chooseNewSyncSource();
+
+ if (candidate.empty()) {
+ if (oldestOpTimeSeen == sentinel) {
+ // If, in this invocation of connectToSyncSource(), we did not successfully
+ // connect to any node ahead of us,
+ // we apparently have no sync sources to connect to.
+ // This situation is common; e.g. if there are no writes to the primary at
+ // the moment.
+ return;
+ }
+
+ // Connected to at least one member, but in all cases we were too stale to use them
+ // as a sync source.
+ log() << "replSet error RS102 too stale to catch up" << rsLog;
+ log() << "replSet our last optime : " << lastOpTimeFetched.toStringLong() << rsLog;
+ log() << "replSet oldest available is " << oldestOpTimeSeen.toStringLong() <<
+ rsLog;
+ log() << "replSet "
+ "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"
+ << rsLog;
+ setMinValid(txn, oldestOpTimeSeen);
+ replCoord->setFollowerMode(MemberState::RS_RECOVERING);
+ return;
+ }
+
+ if (!connect(candidate)) {
+ LOG(2) << "replSet can't connect to " << candidate.toString() <<
+ " to read operations" << rsLog;
+ resetConnection();
+ replCoord->blacklistSyncSource(candidate, Date_t(curTimeMillis64() + 10*1000));
+ continue;
+ }
+ // Read the first (oldest) op and confirm that it's not newer than our last
+ // fetched op. Otherwise, we have fallen off the back of that source's oplog.
+ BSONObj remoteOldestOp(findOne(rsoplog, Query()));
+ BSONElement tsElem(remoteOldestOp["ts"]);
+ if (tsElem.type() != Timestamp) {
+ // This member's got a bad op in its oplog.
+ warning() << "oplog invalid format on node " << candidate.toString();
+ resetConnection();
+ replCoord->blacklistSyncSource(candidate,
+ Date_t(curTimeMillis64() + 600*1000));
+ continue;
+ }
+ OpTime remoteOldOpTime = tsElem._opTime();
+
+ if (lastOpTimeFetched < remoteOldOpTime) {
+ // We're too stale to use this sync source.
+ resetConnection();
+ replCoord->blacklistSyncSource(candidate,
+ Date_t(curTimeMillis64() + 600*1000));
+ if (oldestOpTimeSeen > remoteOldOpTime) {
+ warning() << "we are too stale to use " << candidate.toString() <<
+ " as a sync source";
+ oldestOpTimeSeen = remoteOldOpTime;
+ }
+ continue;
+ }
+
+ // Got a valid sync source.
+ return;
+ } // while (true)
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 884276035ee..f4434e596d7 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -40,6 +40,8 @@ namespace mongo {
extern const BSONObj reverseNaturalObj; // { $natural : -1 }
namespace repl {
+ class ReplicationCoordinator;
+
/**
* Authenticates conn using the server's cluster-membership credentials.
*
@@ -131,6 +133,19 @@ namespace repl {
void putBack(BSONObj op) { cursor->putBack(op); }
HostAndPort getHost() const;
+
+ /**
+ * Connects this OplogReader to a valid sync source, using the provided lastOpTimeFetched
+ * and ReplicationCoordinator objects.
+ * If this function fails to connect to a sync source that is viable, this OplogReader
+ * is left unconnected, where this->conn() equals NULL.
+ * In the process of connecting, this function may add items to the repl coordinator's
+ * sync source blacklist.
+ * This function may throw DB exceptions.
+ */
+ void connectToSyncSource(OperationContext* txn,
+ OpTime lastOpTimeFetched,
+ ReplicationCoordinator* replCoord);
};
} // namespace repl
diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h
index 970bf646bba..46fdaaf8a51 100644
--- a/src/mongo/db/repl/repl_coordinator.h
+++ b/src/mongo/db/repl/repl_coordinator.h
@@ -272,6 +272,11 @@ namespace repl {
virtual OID getMyRID() const = 0;
/**
+ * Returns the id for this node as specified in the current replica set configuration.
+ */
+ virtual int getMyId() const = 0;
+
+ /**
* Sets this node into a specific follower mode.
*
* It is an error to call this method if the node's topology coordinator would not
@@ -495,14 +500,26 @@ namespace repl {
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) = 0;
/**
- * Connects an oplog reader to a viable sync source, using BackgroundSync object bgsync.
- * When this function returns, reader is connected to a viable sync source or is left
- * unconnected if no sync sources are currently available. In legacy, bgsync's
- * _currentSyncTarget is also set appropriately.
- **/
- virtual void connectOplogReader(OperationContext* txn,
- BackgroundSync* bgsync,
- OplogReader* reader) = 0;
+ * Chooses a viable sync source, or, if none available, returns empty HostAndPort.
+ */
+ virtual HostAndPort chooseNewSyncSource() = 0;
+
+ /**
+ * Blacklists choosing 'host' as a sync source until time 'until'.
+ */
+ virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) = 0;
+
+ /**
+ * Loads the optime from the last op in the oplog into the coordinator's lastOpApplied
+ * value.
+ */
+ virtual void resetLastOpTimeFromOplog(OperationContext* txn) = 0;
+
+ /**
+ * Determines if a new sync source should be considered.
+ * currentSource: the current sync source
+ */
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource) = 0;
protected:
diff --git a/src/mongo/db/repl/repl_coordinator_external_state.h b/src/mongo/db/repl/repl_coordinator_external_state.h
index 73721cd5dd9..2b2fa45acd4 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state.h
+++ b/src/mongo/db/repl/repl_coordinator_external_state.h
@@ -58,16 +58,6 @@ namespace repl {
class GlobalSharedLockAcquirer;
class ScopedLocker;
- /**
- * Structure used to pass around information about oplog entry optimes and h values.
- */
- struct OpTimeAndHash {
- OpTimeAndHash() {}
- OpTimeAndHash(OpTime ot, long long h) : opTime(ot), hash(h) {}
- OpTime opTime;
- long long hash;
- };
-
ReplicationCoordinatorExternalState();
virtual ~ReplicationCoordinatorExternalState();
@@ -121,10 +111,10 @@ namespace repl {
virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config) = 0;
/**
- * Gets the last optime and hash of an operation performed on this host, from stable
+ * Gets the last optime of an operation performed on this host, from stable
* storage.
*/
- virtual StatusWith<OpTimeAndHash> loadLastOpTimeAndHash(OperationContext* txn) = 0;
+ virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn) = 0;
/**
* Returns the HostAndPort of the remote client connected to us that initiated the operation
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp
index dd78ae482b4..9ec360369ab 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/connections.h"
#include "mongo/db/repl/isself.h"
#include "mongo/db/repl/oplog.h"
@@ -59,7 +60,6 @@ namespace {
const char meCollectionName[] = "local.me";
const char meDatabaseName[] = "local";
const char tsFieldName[] = "ts";
- const char hashFieldName[] = "h";
} // namespace
ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl() {}
@@ -71,6 +71,11 @@ namespace {
void ReplicationCoordinatorExternalStateImpl::shutdown() {
_syncSourceFeedback.shutdown();
+ BackgroundSync* bgsync = BackgroundSync::get();
+ // bgsync can be null if we shut down prior to installing our initial replset config.
+ if (bgsync) {
+ bgsync->shutdown();
+ }
}
void ReplicationCoordinatorExternalStateImpl::forwardSlaveHandshake() {
@@ -141,37 +146,35 @@ namespace {
}
}
- StatusWith<ReplicationCoordinatorExternalState::OpTimeAndHash>
- ReplicationCoordinatorExternalStateImpl::loadLastOpTimeAndHash(
+ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(
OperationContext* txn) {
try {
Lock::DBRead lk(txn->lockState(), rsoplog);
BSONObj oplogEntry;
if (!Helpers::getLast(txn, rsoplog, oplogEntry)) {
- return StatusWith<OpTimeAndHash>(
+ return StatusWith<OpTime>(
ErrorCodes::NoMatchingDocument,
str::stream() << "Did not find any entries in " << rsoplog);
}
BSONElement tsElement = oplogEntry[tsFieldName];
if (tsElement.eoo()) {
- return StatusWith<OpTimeAndHash>(
+ return StatusWith<OpTime>(
ErrorCodes::NoSuchKey,
str::stream() << "Most recent entry in " << rsoplog << " missing \"" <<
tsFieldName << "\" field");
}
if (tsElement.type() != Timestamp) {
- return StatusWith<OpTimeAndHash>(
+ return StatusWith<OpTime>(
ErrorCodes::TypeMismatch,
str::stream() << "Expected type of \"" << tsFieldName <<
"\" in most recent " << rsoplog <<
" entry to have type Timestamp, but found " << typeName(tsElement.type()));
}
- return StatusWith<OpTimeAndHash>(
- OpTimeAndHash(tsElement._opTime(), oplogEntry[hashFieldName].safeNumberLong()));
+ return StatusWith<OpTime>(tsElement._opTime());
}
catch (const DBException& ex) {
- return StatusWith<OpTimeAndHash>(ex.toStatus());
+ return StatusWith<OpTime>(ex.toStatus());
}
}
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.h b/src/mongo/db/repl/repl_coordinator_external_state_impl.h
index 67740b87735..01ebc44a124 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.h
@@ -50,7 +50,7 @@ namespace repl {
virtual bool isSelf(const HostAndPort& host);
virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn);
virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config);
- virtual StatusWith<OpTimeAndHash> loadLastOpTimeAndHash(OperationContext* txn);
+ virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn);
virtual HostAndPort getClientHostAndPort(const OperationContext* txn);
virtual void closeClientConnections();
virtual ReplicationCoordinatorExternalState::GlobalSharedLockAcquirer*
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp
index 85b7f82fdd5..0465ae5cc2a 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp
@@ -43,7 +43,7 @@ namespace repl {
ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock()
: _localRsConfigDocument(ErrorCodes::NoMatchingDocument, "No local config document"),
- _lastOpTimeAndHash(ErrorCodes::NoMatchingDocument, "No last oplog entry"),
+ _lastOpTime(ErrorCodes::NoMatchingDocument, "No last oplog entry"),
_canAcquireGlobalSharedLock(true),
_connectionsClosed(false) {
}
@@ -95,16 +95,14 @@ namespace repl {
_localRsConfigDocument = localConfigDocument;
}
- StatusWith<ReplicationCoordinatorExternalState::OpTimeAndHash>
- ReplicationCoordinatorExternalStateMock::loadLastOpTimeAndHash(
- OperationContext* txn) {
- return _lastOpTimeAndHash;
+ StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime(
+ OperationContext* txn) {
+ return _lastOpTime;
}
- void ReplicationCoordinatorExternalStateMock::setLastOpTimeAndHash(
- const StatusWith<OpTimeAndHash>& lastApplied) {
-
- _lastOpTimeAndHash = lastApplied;
+ void ReplicationCoordinatorExternalStateMock::setLastOpTime(
+ const StatusWith<OpTime>& lastApplied) {
+ _lastOpTime = lastApplied;
}
void ReplicationCoordinatorExternalStateMock::closeClientConnections() {
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.h b/src/mongo/db/repl/repl_coordinator_external_state_mock.h
index 155ec979000..d10fcc075a2 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.h
@@ -57,7 +57,7 @@ namespace repl {
virtual HostAndPort getClientHostAndPort(const OperationContext* txn);
virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn);
virtual Status storeLocalConfigDocument(OperationContext* txn, const BSONObj& config);
- virtual StatusWith<OpTimeAndHash> loadLastOpTimeAndHash(OperationContext* txn);
+ virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn);
virtual void closeClientConnections();
virtual ReplicationCoordinatorExternalState::GlobalSharedLockAcquirer*
getGlobalSharedLockAcquirer();
@@ -88,11 +88,11 @@ namespace repl {
/**
* Sets the return value for subsequent calls to loadLastOpTimeApplied.
*/
- void setLastOpTimeAndHash(const StatusWith<OpTimeAndHash>& lastApplied);
+ void setLastOpTime(const StatusWith<OpTime>& lastApplied);
private:
StatusWith<BSONObj> _localRsConfigDocument;
- StatusWith<OpTimeAndHash> _lastOpTimeAndHash;
+ StatusWith<OpTime> _lastOpTime;
std::vector<HostAndPort> _selfHosts;
bool _canAcquireGlobalSharedLock;
bool _connectionsClosed;
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
index c9555d0a432..253848c92ff 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
@@ -190,19 +190,26 @@ namespace repl {
Status HybridReplicationCoordinator::setMyLastOptime(OperationContext* txn, const OpTime& ts) {
Status legacyStatus = _legacy.setMyLastOptime(txn, ts);
- Status implStatus = _impl.setMyLastOptime(txn, ts);
- if (legacyStatus.code() != implStatus.code()) {
- warning() << "Hybrid response difference in setMyLastOptime. Legacy response: "
- << legacyStatus << ", impl response: " << implStatus;
+ // Currently, the legacy code calls logOp before we have a config, which
+ // means we can't set the last optime in the impl because we have no place to store it.
+ if (getReplicationMode() == ReplicationCoordinator::modeReplSet ||
+ getReplicationMode() == ReplicationCoordinator::modeMasterSlave) {
+ Status implStatus = _impl.setMyLastOptime(txn, ts);
+ if (legacyStatus.code() != implStatus.code()) {
+ warning() << "Hybrid response difference in setMyLastOptime. Legacy response: "
+ << legacyStatus << ", impl response: " << implStatus;
+ }
+ fassert(18666, legacyStatus.code() == implStatus.code());
}
- fassert(18666, legacyStatus.code() == implStatus.code());
return legacyStatus;
}
OpTime HybridReplicationCoordinator::getMyLastOptime() const {
- _legacy.getMyLastOptime();
- OpTime implOpTime = _impl.getMyLastOptime();
- return implOpTime;
+ OpTime legacyOpTime = _legacy.getMyLastOptime();
+ _impl.getMyLastOptime();
+ // Returning the legacy one for now, because at startup we can only set the legacy and not
+ // the impl (see comment in setMyLastOptime() above).
+ return legacyOpTime;
}
OID HybridReplicationCoordinator::getElectionId() {
@@ -217,6 +224,10 @@ namespace repl {
return legacyRID;
}
+ int HybridReplicationCoordinator::getMyId() const {
+ return _impl.getMyId();
+ }
+
void HybridReplicationCoordinator::setFollowerMode(const MemberState& newState) {
_legacy.setFollowerMode(newState);
_impl.setFollowerMode(newState);
@@ -430,18 +441,25 @@ namespace repl {
return legacyResponse;
}
- void HybridReplicationCoordinator::connectOplogReader(OperationContext* txn,
- BackgroundSync* bgsync,
- OplogReader* r) {
- _legacy.connectOplogReader(txn, bgsync, r);
- HostAndPort legacySyncSource = r->getHost();
- bgsync->connectOplogReader(txn, &_impl, r);
- HostAndPort implSyncSource = r->getHost();
- if (legacySyncSource != implSyncSource) {
- severe() << "sync source mismatch between legacy and impl: " <<
- legacySyncSource.toString() << " and " << implSyncSource.toString();
- fassertFailed(18742);
- }
+ HostAndPort HybridReplicationCoordinator::chooseNewSyncSource() {
+ return _legacy.chooseNewSyncSource();
+ //return _impl.chooseNewSyncSource();
+ }
+
+ void HybridReplicationCoordinator::blacklistSyncSource(const HostAndPort& host, Date_t until) {
+ _legacy.blacklistSyncSource(host, until);
+ _impl.blacklistSyncSource(host, until);
+ }
+
+ void HybridReplicationCoordinator::resetLastOpTimeFromOplog(OperationContext* txn) {
+ _impl.resetLastOpTimeFromOplog(txn);
+ _legacy.resetLastOpTimeFromOplog(txn);
+ }
+
+ bool HybridReplicationCoordinator::shouldChangeSyncSource(const HostAndPort& currentSource) {
+ // Doesn't yet return the correct answer because we have no heartbeats.
+ _impl.shouldChangeSyncSource(currentSource);
+ return _legacy.shouldChangeSyncSource(currentSource);
}
void HybridReplicationCoordinator::setImplConfigHack(const ReplSetConfig* config) {
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h
index d1b1c41e54b..ed33881db34 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.h
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.h
@@ -102,6 +102,8 @@ namespace repl {
virtual OID getMyRID() const;
+ virtual int getMyId() const;
+
virtual void setFollowerMode(const MemberState& newState);
virtual bool isWaitingForApplierToDrain();
@@ -169,9 +171,13 @@ namespace repl {
virtual bool isReplEnabled() const;
- virtual void connectOplogReader(OperationContext* txn,
- BackgroundSync* bgsync,
- OplogReader* r);
+ virtual HostAndPort chooseNewSyncSource();
+
+ virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
+
+ virtual void resetLastOpTimeFromOplog(OperationContext* txn);
+
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource);
/**
* This is a temporary hack to force _impl to set its replset config to the one loaded by
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 77be9c92a2c..57ef45b0d2b 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -162,15 +162,14 @@ namespace {
return true;
}
- StatusWith<ReplicationCoordinatorExternalState::OpTimeAndHash> lastOpTimeStatus =
- _externalState->loadLastOpTimeAndHash(txn);
+ StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn);
OpTime lastOpTime(0, 0);
if (!lastOpTimeStatus.isOK()) {
warning() << "Failed to load timestamp of most recently applied operation; " <<
lastOpTimeStatus.getStatus();
}
else {
- lastOpTime = lastOpTimeStatus.getValue().opTime;
+ lastOpTime = lastOpTimeStatus.getValue();
}
// Use a callback here, because _finishLoadLocalConfig calls isself() which requires
@@ -871,6 +870,16 @@ namespace {
return _myRID;
}
+ int ReplicationCoordinatorImpl::getMyId() const {
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ return _getMyId_inlock();
+ }
+
+ int ReplicationCoordinatorImpl::_getMyId_inlock() const {
+ const MemberConfig& self = _rsConfig.getMemberAt(_thisMembersConfigIndex);
+ return self.getId();
+ }
+
void ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand(
OperationContext* txn,
BSONObjBuilder* cmdBuilder) {
@@ -1503,12 +1512,6 @@ namespace {
return _settings.usingReplSets() || _settings.master || _settings.slave;
}
- void ReplicationCoordinatorImpl::connectOplogReader(OperationContext* txn,
- BackgroundSync* bgsync,
- OplogReader* r) {
- invariant(false);
- }
-
void ReplicationCoordinatorImpl::_chooseNewSyncSource(
const ReplicationExecutor::CallbackData& cbData,
HostAndPort* newSyncSource) {
@@ -1557,5 +1560,45 @@ namespace {
_replExecutor.wait(cbh.getValue());
}
+ void ReplicationCoordinatorImpl::resetLastOpTimeFromOplog(OperationContext* txn) {
+ StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn);
+ OpTime lastOpTime(0, 0);
+ if (!lastOpTimeStatus.isOK()) {
+ warning() << "Failed to load timestamp of most recently applied operation; " <<
+ lastOpTimeStatus.getStatus();
+ }
+ else {
+ lastOpTime = lastOpTimeStatus.getValue();
+ }
+ boost::unique_lock<boost::mutex> lk(_mutex);
+ _setLastOptime_inlock(&lk, _getMyRID_inlock(), lastOpTime);
+ }
+
+ void ReplicationCoordinatorImpl::_shouldChangeSyncSource(
+ const ReplicationExecutor::CallbackData& cbData,
+ const HostAndPort& currentSource,
+ bool* shouldChange) {
+ if (cbData.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+ *shouldChange = _topCoord->shouldChangeSyncSource(currentSource);
+ }
+
+ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource) {
+ bool shouldChange(false);
+ CBHStatus cbh = _replExecutor.scheduleWork(
+ stdx::bind(&ReplicationCoordinatorImpl::_shouldChangeSyncSource,
+ this,
+ stdx::placeholders::_1,
+ currentSource,
+ &shouldChange));
+ if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return false;
+ }
+ fassert(18906, cbh.getStatus());
+ _replExecutor.wait(cbh.getValue());
+ return shouldChange;
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h
index 870c79ebbba..f2fba19bb40 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_impl.h
@@ -136,6 +136,8 @@ namespace repl {
virtual OID getMyRID() const;
+ virtual int getMyId() const;
+
virtual void setFollowerMode(const MemberState& newState);
virtual bool isWaitingForApplierToDrain();
@@ -203,11 +205,14 @@ namespace repl {
virtual bool isReplEnabled() const;
- virtual void connectOplogReader(OperationContext* txn,
- BackgroundSync* bgsync,
- OplogReader* r);
+ virtual HostAndPort chooseNewSyncSource();
+
+ virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
+
+ virtual void resetLastOpTimeFromOplog(OperationContext* txn);
+
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource);
-
// ================== Members of replication code internal API ===================
// This is a temporary hack to set the replset config to the config detected by the
@@ -231,21 +236,6 @@ namespace repl {
*/
void cancelHeartbeats();
- /**
- * Chooses a sync source.
- * A wrapper that schedules _chooseNewSyncSource() through the Replication Executor and
- * waits for its completion.
- */
- HostAndPort chooseNewSyncSource();
-
- /**
- * Blacklists 'host' until 'until'.
- * A wrapper that schedules _blacklistSyncSource() through the Replication Executor and
- * waits for its completion.
- */
- void blacklistSyncSource(const HostAndPort& host, Date_t until);
-
-
// ================== Test support API ===================
/**
@@ -408,6 +398,8 @@ namespace repl {
OID _getMyRID_inlock() const;
+ int _getMyId_inlock() const;
+
/**
* Bottom half of setFollowerMode.
*/
@@ -532,7 +524,14 @@ namespace repl {
void _blacklistSyncSource(const ReplicationExecutor::CallbackData& cbData,
const HostAndPort& host,
Date_t until);
-
+ /**
+ * Determines if a new sync source should be considered.
+ *
+ * Must be scheduled as a callback.
+ */
+ void _shouldChangeSyncSource(const ReplicationExecutor::CallbackData& cbData,
+ const HostAndPort& currentSource,
+ bool* shouldChange);
//
// All member variables are labeled with one of the following codes indicating the
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp
index 48f0797a483..8df815c60ed 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.cpp
+++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp
@@ -99,9 +99,6 @@ namespace repl {
}
void LegacyReplicationCoordinator::shutdown() {
- if (getReplicationMode() == modeReplSet) {
- theReplSet->shutdown();
- }
}
ReplSettings& LegacyReplicationCoordinator::getSettings() {
@@ -424,7 +421,9 @@ namespace {
boost::lock_guard<boost::mutex> lock(_mutex);
SlaveOpTimeMap::const_iterator it(_slaveOpTimeMap.find(_myRID));
- invariant(it != _slaveOpTimeMap.end());
+ if (it == _slaveOpTimeMap.end()) {
+ return OpTime(0,0);
+ }
OpTime legacyMapOpTime = it->second;
OpTime legacyOpTime = theReplSet->lastOpTimeWritten;
// TODO(emilkie): SERVER-15209
@@ -444,6 +443,11 @@ namespace {
return _myRID;
}
+ int LegacyReplicationCoordinator::getMyId() const {
+ invariant(false);
+ return 0;
+ }
+
void LegacyReplicationCoordinator::setFollowerMode(const MemberState& newState) {
theReplSet->changeState(newState);
}
@@ -600,9 +604,9 @@ namespace {
response->setHbMsg(theReplSet->hbmsg());
response->setTime(Seconds(time(0)));
response->setOpTime(theReplSet->lastOpTimeWritten.asDate());
- const Member *syncTarget = BackgroundSync::get()->getSyncTarget();
- if (syncTarget) {
- response->setSyncingTo(syncTarget->fullName());
+ const HostAndPort syncTarget = BackgroundSync::get()->getSyncTarget();
+ if (!syncTarget.empty()) {
+ response->setSyncingTo(syncTarget.toString());
}
int v = theReplSet->config().version;
@@ -1028,12 +1032,27 @@ namespace {
return _settings.usingReplSets() || _settings.slave || _settings.master;
}
- void LegacyReplicationCoordinator::connectOplogReader(OperationContext* txn,
- BackgroundSync* bgsync,
- OplogReader* r) {
- bgsync->getOplogReaderLegacy(txn, r);
+ HostAndPort LegacyReplicationCoordinator::chooseNewSyncSource() {
+ const Member* member = theReplSet->getMemberToSyncTo();
+ if (member) {
+ return member->h();
+ }
+ else {
+ return HostAndPort();
+ }
+ }
+
+ void LegacyReplicationCoordinator::blacklistSyncSource(const HostAndPort& host, Date_t until) {
+ theReplSet->veto(host.toString(), until);
}
+ void LegacyReplicationCoordinator::resetLastOpTimeFromOplog(OperationContext* txn) {
+ theReplSet->loadLastOpTimeWritten(txn, false);
+ }
+
+ bool LegacyReplicationCoordinator::shouldChangeSyncSource(const HostAndPort& currentSource) {
+ return theReplSet->shouldChangeSyncTarget(currentSource);
+ }
} // namespace repl
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h
index 8d724ba469c..3d354c60ef5 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.h
+++ b/src/mongo/db/repl/repl_coordinator_legacy.h
@@ -98,6 +98,8 @@ namespace repl {
virtual OID getMyRID() const;
+ virtual int getMyId() const;
+
virtual void setFollowerMode(const MemberState& newState);
virtual bool isWaitingForApplierToDrain();
@@ -165,9 +167,13 @@ namespace repl {
virtual bool isReplEnabled() const;
- virtual void connectOplogReader(OperationContext* txn,
- BackgroundSync* bgsync,
- OplogReader* r);
+ virtual HostAndPort chooseNewSyncSource();
+
+ virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
+
+ virtual void resetLastOpTimeFromOplog(OperationContext* txn);
+
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource);
private:
diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp
index 686ca9cebad..51353d897ed 100644
--- a/src/mongo/db/repl/repl_coordinator_mock.cpp
+++ b/src/mongo/db/repl/repl_coordinator_mock.cpp
@@ -141,6 +141,10 @@ namespace repl {
return OID();
}
+ int ReplicationCoordinatorMock::getMyId() const {
+ return 0;
+ }
+
void ReplicationCoordinatorMock::setFollowerMode(const MemberState& newState) {
}
@@ -262,9 +266,20 @@ namespace repl {
return Status::OK();
}
- void ReplicationCoordinatorMock::connectOplogReader(OperationContext* txn,
- BackgroundSync* bgsync,
- OplogReader* r) {
+ HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource() {
+ invariant(false);
+ return HostAndPort();
+ }
+
+ void ReplicationCoordinatorMock::blacklistSyncSource(const HostAndPort& host, Date_t until) {
+ invariant(false);
+ }
+
+ void ReplicationCoordinatorMock::resetLastOpTimeFromOplog(OperationContext* txn) {
+ invariant(false);
+ }
+
+ bool ReplicationCoordinatorMock::shouldChangeSyncSource(const HostAndPort& currentSource) {
invariant(false);
}
diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h
index e7c991a734e..63f7680ede2 100644
--- a/src/mongo/db/repl/repl_coordinator_mock.h
+++ b/src/mongo/db/repl/repl_coordinator_mock.h
@@ -99,6 +99,8 @@ namespace repl {
virtual OID getMyRID() const;
+ virtual int getMyId() const;
+
virtual void setFollowerMode(const MemberState& newState);
virtual bool isWaitingForApplierToDrain();
@@ -164,9 +166,13 @@ namespace repl {
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result);
- virtual void connectOplogReader(OperationContext* txn,
- BackgroundSync* bgsync,
- OplogReader* r);
+ virtual HostAndPort chooseNewSyncSource();
+
+ virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
+
+ virtual void resetLastOpTimeFromOplog(OperationContext* txn);
+
+ virtual bool shouldChangeSyncSource(const HostAndPort& currentSource);
private:
diff --git a/src/mongo/db/repl/repl_set.h b/src/mongo/db/repl/repl_set.h
index bb70c577627..f53a2fdcfbe 100644
--- a/src/mongo/db/repl/repl_set.h
+++ b/src/mongo/db/repl/repl_set.h
@@ -55,7 +55,6 @@ namespace repl {
/* call after constructing to start - returns fairly quickly after launching its threads */
void go() { _go(); }
- void shutdown();
virtual bool isPrimary() { return box.getState().primary(); }
virtual bool isSecondary() { return box.getState().secondary(); }
diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp
index ec066932801..ee6e5f240d0 100644
--- a/src/mongo/db/repl/repl_set_impl.cpp
+++ b/src/mongo/db/repl/repl_set_impl.cpp
@@ -385,7 +385,6 @@ namespace {
_cfg = 0;
memset(_hbmsg, 0, sizeof(_hbmsg));
strcpy(_hbmsg , "initial startup");
- lastH = 0;
changeState(MemberState::RS_STARTUP);
_seeds = &replSetSeedList.seeds;
@@ -437,7 +436,6 @@ namespace {
_self(0),
_maintenanceMode(0),
mgr(0),
- initialSyncRequested(false), // only used for resync
_indexPrefetchConfig(PREFETCH_ALL) {
}
@@ -445,7 +443,6 @@ namespace {
Lock::DBRead lk(txn->lockState(), rsoplog);
BSONObj o;
if (Helpers::getLast(txn, rsoplog, o)) {
- lastH = o["h"].numberLong();
OpTime lastOpTime = o["ts"]._opTime();
uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTime.isNull());
getGlobalReplicationCoordinator()->setMyLastOptime(txn, lastOpTime);
@@ -469,6 +466,8 @@ namespace {
OperationContextImpl txn;
try {
+ // Note: this sets lastOpTimeWritten, which the Applier uses to determine whether to
+ // do an initial sync or not.
loadLastOpTimeWritten(&txn);
}
catch (std::exception& e) {
@@ -494,6 +493,7 @@ namespace {
}
}
+
getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_STARTUP2);
startThreads();
newReplUp(); // oplog.cpp
@@ -877,6 +877,125 @@ namespace {
startupStatusMsg.set("? started");
startupStatus = STARTED;
}
+ const Member* ReplSetImpl::getMemberToSyncTo() {
+ lock lk(this);
+
+ // if we have a target we've requested to sync from, use it
+
+ if (_forceSyncTarget) {
+ Member* target = _forceSyncTarget;
+ _forceSyncTarget = 0;
+ sethbmsg( str::stream() << "syncing to: " << target->fullName() << " by request", 0);
+ return target;
+ }
+
+ const Member* primary = box.getPrimary();
+
+ // wait for 2N pings before choosing a sync target
+ if (_cfg) {
+ int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings;
+
+ if (needMorePings > 0) {
+ OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl;
+ return NULL;
+ }
+
+ // If we are only allowed to sync from the primary, return that
+ if (!_cfg->chainingAllowed()) {
+ // Returns NULL if we cannot reach the primary
+ return primary;
+ }
+ }
+
+ // find the member with the lowest ping time that has more data than me
+
+ // Find primary's oplog time. Reject sync candidates that are more than
+ // maxSyncSourceLagSecs seconds behind.
+ OpTime primaryOpTime;
+ if (primary)
+ primaryOpTime = primary->hbinfo().opTime;
+ else
+ // choose a time that will exclude no candidates, since we don't see a primary
+ primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
+
+ if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) {
+ // erh - I think this means there was just a new election
+ // and we don't yet know the new primary's optime
+ primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
+ }
+
+ OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0);
+
+ Member *closest = 0;
+ time_t now = 0;
+
+ // Make two attempts. The first attempt, we ignore those nodes with
+ // slave delay higher than our own. The second attempt includes such
+ // nodes, in case those are the only ones we can reach.
+ // This loop attempts to set 'closest'.
+ for (int attempts = 0; attempts < 2; ++attempts) {
+ for (Member *m = _members.head(); m; m = m->next()) {
+ if (!m->syncable())
+ continue;
+
+ if (m->state() == MemberState::RS_SECONDARY) {
+ // only consider secondaries that are ahead of where we are
+ if (m->hbinfo().opTime <= lastOpTimeWritten)
+ continue;
+ // omit secondaries that are excessively behind, on the first attempt at least.
+ if (attempts == 0 &&
+ m->hbinfo().opTime < oldestSyncOpTime)
+ continue;
+ }
+
+ // omit nodes that are more latent than anything we've already considered
+ if (closest &&
+ (m->hbinfo().ping > closest->hbinfo().ping))
+ continue;
+
+ if (attempts == 0 &&
+ (myConfig().slaveDelay < m->config().slaveDelay || m->config().hidden)) {
+ continue; // skip this one in the first attempt
+ }
+
+ map<string,time_t>::iterator vetoed = _veto.find(m->fullName());
+ if (vetoed != _veto.end()) {
+ // Do some veto housekeeping
+ if (now == 0) {
+ now = time(0);
+ }
+
+ // if this was on the veto list, check if it was vetoed in the last "while".
+ // if it was, skip.
+ if (vetoed->second >= now) {
+ if (time(0) % 5 == 0) {
+ log() << "replSet not trying to sync from " << (*vetoed).first
+ << ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog;
+ }
+ continue;
+ }
+ _veto.erase(vetoed);
+ // fall through, this is a valid candidate now
+ }
+ // This candidate has passed all tests; set 'closest'
+ closest = m;
+ }
+ if (closest) break; // no need for second attempt
+ }
+
+ if (!closest) {
+ return NULL;
+ }
+
+ sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0);
+
+ return closest;
+ }
+
+ void ReplSetImpl::veto(const string& host, const Date_t until) {
+ lock lk(this);
+ _veto[host] = until.toTimeT();
+ }
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h
index f6ccc591c9d..5664f6b1638 100644
--- a/src/mongo/db/repl/repl_set_impl.h
+++ b/src/mongo/db/repl/repl_set_impl.h
@@ -90,19 +90,16 @@ namespace repl {
OpTime lastOpTimeWritten;
OpTime getEarliestOpTimeWritten() const;
- // hash we use to make sure we are reading the right flow of ops and aren't on
- // an out-of-date "fork"
- long long lastH;
Status forceSyncFrom(const string& host, BSONObjBuilder* result);
// Check if the current sync target is suboptimal. This must be called while holding a mutex
// that prevents the sync source from changing.
- bool shouldChangeSyncTarget(const OpTime& target) const;
+ bool shouldChangeSyncTarget(const HostAndPort& target) const;
/**
* Find the closest member (using ping time) with a higher latest optime.
*/
const Member* getMemberToSyncTo();
- void veto(const string& host, unsigned secs=10);
+ void veto(const string& host, Date_t until);
bool gotForceSync();
void goStale(OperationContext* txn, const Member* m, const BSONObj& o);
@@ -292,15 +289,14 @@ namespace repl {
OplogReader* r,
const Member* source);
void _initialSync();
- void syncDoInitialSync();
void _syncThread();
void syncTail();
void syncFixUp(OperationContext* txn, FixUpInfo& h, OplogReader& r);
+ public:
// keep a list of hosts that we've tried recently that didn't work
map<string,time_t> _veto;
- public:
// Allow index prefetching to be turned on/off
enum IndexPrefetchConfig {
PREFETCH_NONE=0, PREFETCH_ID_ONLY=1, PREFETCH_ALL=2
@@ -312,10 +308,8 @@ namespace repl {
IndexPrefetchConfig getIndexPrefetchConfig() {
return _indexPrefetchConfig;
}
-
const ReplSetConfig::MemberCfg& myConfig() const { return _config; }
- void tryToGoLiveAsASecondary(OperationContext* txn);
void syncThread();
const OpTime lastOtherOpTime() const;
/**
@@ -323,16 +317,9 @@ namespace repl {
*/
const OpTime lastOtherElectableOpTime() const;
- // bool for indicating resync need on this node and the mutex that protects it
- bool initialSyncRequested;
- boost::mutex initialSyncMutex;
-
BSONObj getLastErrorDefault;
private:
IndexPrefetchConfig _indexPrefetchConfig;
-
- static const char* _initialSyncFlagString;
- static const BSONObj _initialSyncFlag;
};
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp
index 440996e95e0..026814ee39e 100644
--- a/src/mongo/db/repl/rs.cpp
+++ b/src/mongo/db/repl/rs.cpp
@@ -135,10 +135,6 @@ namespace repl {
cc().shutdown();
}
- void ReplSet::shutdown() {
- BackgroundSync::shutdown();
- }
-
void replLocalAuth() {
cc().getAuthorizationSession()->grantInternalAuthorization();
}
diff --git a/src/mongo/db/repl/rs_config.cpp b/src/mongo/db/repl/rs_config.cpp
index 9017d7f715c..52637f3c9be 100644
--- a/src/mongo/db/repl/rs_config.cpp
+++ b/src/mongo/db/repl/rs_config.cpp
@@ -90,8 +90,6 @@ namespace {
{
Client::WriteContext cx(txn, rsConfigNs);
- //theReplSet->lastOpTimeWritten = ??;
- //rather than above, do a logOp()? probably
Helpers::putSingletonGod(txn,
rsConfigNs.c_str(),
newConfigBSON,
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index a4d8f77045a..5709f6eb98c 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/repl/rs.h"
+#include "mongo/db/repl/rs_initialsync.h"
#include "mongo/bson/optime.h"
#include "mongo/db/auth/authorization_manager.h"
@@ -42,7 +42,6 @@
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/initial_sync.h"
-#include "mongo/db/repl/member.h"
#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
@@ -52,50 +51,14 @@
#include "mongo/util/mongoutils/str.h"
namespace mongo {
-
namespace repl {
+namespace {
- using namespace mongoutils;
-
- // add try/catch with sleep
-
- void isyncassert(const string& msg, bool expr) {
- if( !expr ) {
- string m = str::stream() << "initial sync " << msg;
- theReplSet->sethbmsg(m, 0);
- uasserted(13404, m);
- }
- }
-
- void ReplSetImpl::syncDoInitialSync() {
- static const int maxFailedAttempts = 10;
-
- OperationContextImpl txn;
- createOplog(&txn);
-
- int failedAttempts = 0;
- while ( failedAttempts < maxFailedAttempts ) {
- try {
- _initialSync();
- break;
- }
- catch(DBException& e) {
- failedAttempts++;
- str::stream msg;
- msg << "initial sync exception: ";
- msg << e.toString() << " " << (maxFailedAttempts - failedAttempts) << " attempts remaining" ;
- sethbmsg(msg, 0);
- sleepsecs(30);
- }
- }
- fassert( 16233, failedAttempts < maxFailedAttempts);
- }
-
- bool ReplSetImpl::_initialSyncClone(OperationContext* txn,
- Cloner& cloner,
- const std::string& host,
- const list<string>& dbs,
- bool dataPass) {
+ bool _initialSyncClone(OperationContext* txn,
+ Cloner& cloner,
+ const std::string& host,
+ const list<string>& dbs,
+ bool dataPass) {
for( list<string>::const_iterator i = dbs.begin(); i != dbs.end(); i++ ) {
const string db = *i;
@@ -103,9 +66,9 @@ namespace repl {
continue;
if ( dataPass )
- sethbmsg( str::stream() << "initial sync cloning db: " << db , 0);
+ log() << "initial sync cloning db: " << db;
else
- sethbmsg( str::stream() << "initial sync cloning indexes for : " << db , 0);
+ log() << "initial sync cloning indexes for : " << db;
string err;
int errCode;
@@ -124,10 +87,9 @@ namespace repl {
Lock::DBLock dbWrite(txn->lockState(), db, newlm::MODE_X);
if (!cloner.go(txn, db, host, options, NULL, err, &errCode)) {
- sethbmsg(str::stream() << "initial sync: error while "
- << (dataPass ? "cloning " : "indexing ") << db
- << ". " << (err.empty() ? "" : err + ". ")
- << "sleeping 5 minutes" ,0);
+ log() << "initial sync: error while "
+ << (dataPass ? "cloning " : "indexing ") << db
+ << ". " << (err.empty() ? "" : err + ". ");
return false;
}
}
@@ -135,140 +97,6 @@ namespace repl {
return true;
}
- static void emptyOplog(OperationContext* txn) {
- Client::WriteContext ctx(txn, rsoplog);
-
- Collection* collection = ctx.ctx().db()->getCollection(txn, rsoplog);
-
- // temp
- if( collection->numRecords(txn) == 0 )
- return; // already empty, ok.
-
- LOG(1) << "replSet empty oplog" << rsLog;
- uassertStatusOK( collection->truncate(txn) );
- ctx.commit();
- }
-
- const Member* ReplSetImpl::getMemberToSyncTo() {
- lock lk(this);
-
- // if we have a target we've requested to sync from, use it
-
- if (_forceSyncTarget) {
- Member* target = _forceSyncTarget;
- _forceSyncTarget = 0;
- sethbmsg( str::stream() << "syncing to: " << target->fullName() << " by request", 0);
- return target;
- }
-
- const Member* primary = box.getPrimary();
-
- // wait for 2N pings before choosing a sync target
- if (_cfg) {
- int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings;
-
- if (needMorePings > 0) {
- OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl;
- return NULL;
- }
-
- // If we are only allowed to sync from the primary, return that
- if (!_cfg->chainingAllowed()) {
- // Returns NULL if we cannot reach the primary
- return primary;
- }
- }
-
- // find the member with the lowest ping time that has more data than me
-
- // Find primary's oplog time. Reject sync candidates that are more than
- // maxSyncSourceLagSecs seconds behind.
- OpTime primaryOpTime;
- if (primary)
- primaryOpTime = primary->hbinfo().opTime;
- else
- // choose a time that will exclude no candidates, since we don't see a primary
- primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
-
- if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) {
- // erh - I think this means there was just a new election
- // and we don't yet know the new primary's optime
- primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
- }
-
- OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0);
-
- Member *closest = 0;
- time_t now = 0;
-
- // Make two attempts. The first attempt, we ignore those nodes with
- // slave delay higher than our own. The second attempt includes such
- // nodes, in case those are the only ones we can reach.
- // This loop attempts to set 'closest'.
- for (int attempts = 0; attempts < 2; ++attempts) {
- for (Member *m = _members.head(); m; m = m->next()) {
- if (!m->syncable())
- continue;
-
- if (m->state() == MemberState::RS_SECONDARY) {
- // only consider secondaries that are ahead of where we are
- if (m->hbinfo().opTime <= lastOpTimeWritten)
- continue;
- // omit secondaries that are excessively behind, on the first attempt at least.
- if (attempts == 0 &&
- m->hbinfo().opTime < oldestSyncOpTime)
- continue;
- }
-
- // omit nodes that are more latent than anything we've already considered
- if (closest &&
- (m->hbinfo().ping > closest->hbinfo().ping))
- continue;
-
- if (attempts == 0 &&
- (myConfig().slaveDelay < m->config().slaveDelay || m->config().hidden)) {
- continue; // skip this one in the first attempt
- }
-
- map<string,time_t>::iterator vetoed = _veto.find(m->fullName());
- if (vetoed != _veto.end()) {
- // Do some veto housekeeping
- if (now == 0) {
- now = time(0);
- }
-
- // if this was on the veto list, check if it was vetoed in the last "while".
- // if it was, skip.
- if (vetoed->second >= now) {
- if (time(0) % 5 == 0) {
- log() << "replSet not trying to sync from " << (*vetoed).first
- << ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog;
- }
- continue;
- }
- _veto.erase(vetoed);
- // fall through, this is a valid candidate now
- }
- // This candidate has passed all tests; set 'closest'
- closest = m;
- }
- if (closest) break; // no need for second attempt
- }
-
- if (!closest) {
- return NULL;
- }
-
- sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0);
-
- return closest;
- }
-
- void ReplSetImpl::veto(const string& host, const unsigned secs) {
- lock lk(this);
- _veto[host] = time(0)+secs;
- }
-
/**
* Replays the sync target's oplog from lastOp to the latest op on the sync target.
*
@@ -277,11 +105,10 @@ namespace repl {
* @param source the sync target
* @return if applying the oplog succeeded
*/
- bool ReplSetImpl::_initialSyncApplyOplog( OperationContext* ctx,
- repl::SyncTail& syncer,
- OplogReader* r,
- const Member* source) {
- const OpTime startOpTime = lastOpTimeWritten;
+ bool _initialSyncApplyOplog( OperationContext* ctx,
+ repl::SyncTail& syncer,
+ OplogReader* r) {
+ const OpTime startOpTime = getGlobalReplicationCoordinator()->getMyLastOptime();
BSONObj lastOp;
try {
// It may have been a long time since we last used this connection to
@@ -291,16 +118,22 @@ namespace repl {
// Solution is to increase the TCP keepalive frequency.
lastOp = r->getLastOp(rsoplog);
} catch ( SocketException & ) {
- log() << "connection lost to " << source->h().toString() << "; is your tcp keepalive interval set appropriately?";
- if( !r->connect(source->h()) ) {
- sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0);
+ HostAndPort host = r->getHost();
+ log() << "connection lost to " << host.toString() <<
+ "; is your tcp keepalive interval set appropriately?";
+ if( !r->connect(host) ) {
+ error() << "initial sync couldn't connect to " << host.toString();
throw;
}
// retry
lastOp = r->getLastOp(rsoplog);
}
- isyncassert( "lastOp is empty ", !lastOp.isEmpty() );
+ if (lastOp.isEmpty()) {
+ error() << "initial sync lastOp is empty";
+ sleepsecs(1);
+ return false;
+ }
OpTime stopOpTime = lastOp["ts"]._opTime();
@@ -320,7 +153,7 @@ namespace repl {
<< rsLog;
getGlobalReplicationCoordinator()->setMyLastOptime(ctx, OpTime());
- lastH = 0;
+ BackgroundSync::get()->setLastHash(0);
sleepsecs(5);
return false;
@@ -350,42 +183,36 @@ namespace repl {
* this member should have consistent data. 8 is "cosmetic," it is only to get this member
* closer to the latest op time before it can transition out of startup state
*/
- void ReplSetImpl::_initialSync() {
- InitialSync init(BackgroundSync::get());
- SyncTail tail(BackgroundSync::get(), multiSyncApply);
- sethbmsg("initial sync pending",0);
-
- // if this is the first node, it may have already become primary
- if ( box.getState().primary() ) {
- sethbmsg("I'm already primary, no need for initial sync",0);
- return;
- }
-
- const Member *source = getMemberToSyncTo();
- if (!source) {
- sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0);
- sleepsecs(15);
- return;
- }
+ void _initialSync() {
+ BackgroundSync* bgsync(BackgroundSync::get());
+ InitialSync init(bgsync);
+ SyncTail tail(bgsync, multiSyncApply);
+ log() << "initial sync pending";
- string sourceHostname = source->h().toString();
- init.setHostname(sourceHostname);
OplogReader r;
- if( !r.connect(source->h()) ) {
- sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0);
- sleepsecs(15);
+ OpTime now(Milliseconds(curTimeMillis64()).total_seconds(), 0);
+ OperationContextImpl txn;
+
+ ReplicationCoordinator* replCoord(getGlobalReplicationCoordinator());
+
+ // We must prime the sync source selector so that it considers all candidates regardless
+ // of oplog position, by passing in "now" as the last op fetched time.
+ r.connectToSyncSource(&txn, now, replCoord);
+ if (r.getHost().empty()) {
+ log() << "no valid sync sources found in current replset to do an initial sync";
+ sleepsecs(3);
return;
}
+ init.setHostname(r.getHost().toString());
+
BSONObj lastOp = r.getLastOp(rsoplog);
- if( lastOp.isEmpty() ) {
- sethbmsg("initial sync couldn't read remote oplog", 0);
+ if ( lastOp.isEmpty() ) {
+ log() << "initial sync couldn't read remote oplog";
sleepsecs(15);
return;
}
- OperationContextImpl txn;
-
if (getGlobalReplicationCoordinator()->getSettings().fastsync) {
log() << "fastsync: skipping database clone" << rsLog;
@@ -394,54 +221,49 @@ namespace repl {
_logOpObjRS(&txn, lastOp);
return;
}
- else {
- // Add field to minvalid document to tell us to restart initial sync if we crash
- setInitialSyncFlag(&txn);
- sethbmsg("initial sync drop all databases", 0);
- dropAllDatabasesExceptLocal(&txn);
+ // Add field to minvalid document to tell us to restart initial sync if we crash
+ setInitialSyncFlag(&txn);
- sethbmsg("initial sync clone all databases", 0);
+ log() << "initial sync drop all databases";
+ dropAllDatabasesExceptLocal(&txn);
- list<string> dbs = r.conn()->getDatabaseNames();
+ log() << "initial sync clone all databases";
- Cloner cloner;
- if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, true)) {
- veto(source->fullName(), 600);
- sleepsecs(300);
- return;
- }
+ list<string> dbs = r.conn()->getDatabaseNames();
- sethbmsg("initial sync data copy, starting syncup",0);
+ Cloner cloner;
+ if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, true)) {
+ return;
+ }
- // prime oplog
- init.syncApply(&txn, lastOp, false);
- _logOpObjRS(&txn, lastOp);
+ log() << "initial sync data copy, starting syncup";
- log() << "oplog sync 1 of 3" << endl;
- if (!_initialSyncApplyOplog(&txn, init, &r , source)) {
- return;
- }
+ // prime oplog
+ init.syncApply(&txn, lastOp, false);
+ _logOpObjRS(&txn, lastOp);
- // Now we sync to the latest op on the sync target _again_, as we may have recloned ops
- // that were "from the future" compared with minValid. During this second application,
- // nothing should need to be recloned.
- log() << "oplog sync 2 of 3" << endl;
- if (!_initialSyncApplyOplog(&txn, init, &r , source)) {
- return;
- }
- // data should now be consistent
+ log() << "oplog sync 1 of 3" << endl;
+ if (!_initialSyncApplyOplog(&txn, init, &r)) {
+ return;
+ }
- sethbmsg("initial sync building indexes",0);
- if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, false)) {
- veto(source->fullName(), 600);
- sleepsecs(300);
- return;
- }
+ // Now we sync to the latest op on the sync target _again_, as we may have recloned ops
+ // that were "from the future" compared with minValid. During this second application,
+ // nothing should need to be recloned.
+ log() << "oplog sync 2 of 3" << endl;
+ if (!_initialSyncApplyOplog(&txn, init, &r)) {
+ return;
+ }
+ // data should now be consistent
+
+ log() << "initial sync building indexes";
+ if (!_initialSyncClone(&txn, cloner, r.conn()->getServerAddress(), dbs, false)) {
+ return;
}
log() << "oplog sync 3 of 3" << endl;
- if (!_initialSyncApplyOplog(&txn, tail, &r, source)) {
+ if (!_initialSyncApplyOplog(&txn, tail, &r)) {
return;
}
@@ -453,17 +275,12 @@ namespace repl {
return;
}
- sethbmsg("initial sync finishing up",0);
-
- verify( !box.getState().primary() ); // wouldn't make sense if we were.
+ log() << "initial sync finishing up";
{
Client::WriteContext cx(&txn, "local.");
-
- try {
- log() << "replSet set minValid=" << lastOpTimeWritten << rsLog;
- }
- catch(...) { }
+ OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastOptime());
+ log() << "replSet set minValid=" << lastOpTimeWritten << rsLog;
// Initial sync is now complete. Flag this by setting minValid to the last thing
// we synced.
@@ -471,18 +288,39 @@ namespace repl {
// Clear the initial sync flag.
clearInitialSyncFlag(&txn);
+ BackgroundSync::get()->setInitialSyncRequestedFlag(false);
cx.commit();
}
- {
- boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex);
- theReplSet->initialSyncRequested = false;
- }
// If we just cloned & there were no ops applied, we still want the primary to know where
// we're up to
- BackgroundSync::notify();
+ bgsync->notify();
+
+ log() << "initial sync done";
+ }
+} // namespace
+
+ void syncDoInitialSync() {
+ static const int maxFailedAttempts = 10;
+
+ OperationContextImpl txn;
+ createOplog(&txn);
- sethbmsg("initial sync done",0);
+ int failedAttempts = 0;
+ while ( failedAttempts < maxFailedAttempts ) {
+ try {
+ _initialSync();
+ break;
+ }
+ catch(DBException& e) {
+ failedAttempts++;
+ mongoutils::str::stream msg;
+ error() << "initial sync exception: " << e.toString() << " " <<
+ (maxFailedAttempts - failedAttempts) << " attempts remaining";
+ sleepsecs(5);
+ }
+ }
+ fassert( 16233, failedAttempts < maxFailedAttempts);
}
} // namespace repl
diff --git a/src/mongo/db/repl/rs_initialsync.h b/src/mongo/db/repl/rs_initialsync.h
new file mode 100644
index 00000000000..659bb5ad577
--- /dev/null
+++ b/src/mongo/db/repl/rs_initialsync.h
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+namespace repl {
+ /**
+ * Begins an initial sync of a node. This drops all data, chooses a sync source,
+ * and runs the cloner from that sync source. The node's state is not changed.
+ */
+ void syncDoInitialSync();
+}
+}
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 349987bf655..bd9411a0feb 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -45,11 +45,12 @@
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/query/internal_plans.h"
+#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/repl_coordinator.h"
-#include "mongo/db/repl/rs.h"
+#include "mongo/db/repl/repl_coordinator_impl.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/util/log.h"
@@ -695,8 +696,10 @@ namespace {
warn = true;
}
- // reset cached lastoptimewritten and h value
- theReplSet->loadLastOpTimeWritten(txn);
+ // Reload the lastOpTimeApplied value in the replcoord and the lastHash value in bgsync
+ // to reflect our new last op.
+ replCoord->resetLastOpTimeFromOplog(txn);
+ BackgroundSync::get()->loadLastHash(txn);
// done
if (warn)
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index d7c9218b6ec..142ecaa69fb 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -35,7 +35,6 @@
#include "third_party/murmurhash3/MurmurHash3.h"
#include "mongo/base/counter.h"
-#include "mongo/db/catalog/database.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status.h"
@@ -50,6 +49,7 @@
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/rs.h"
+#include "mongo/db/repl/rs_initialsync.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/db/repl/sync_tail.h"
#include "mongo/db/server_parameters.h"
@@ -62,46 +62,6 @@
namespace mongo {
namespace repl {
- /* should be in RECOVERING state on arrival here.
- */
- void ReplSetImpl::tryToGoLiveAsASecondary(OperationContext* txn) {
- if (getGlobalReplicationCoordinator()->getMaintenanceMode()) {
- // we're not actually going live
- return;
- }
-
- lock rsLock( this );
-
- // if we're blocking sync, don't change state
- if (_blockSync) {
- return;
- }
-
- // if we're fsync-and-locked, don't bother checking
- if (lockedForWriting()) {
- return;
- }
-
- Lock::GlobalWrite writeLock(txn->lockState());
-
- // Only state RECOVERING can transition to SECONDARY.
- MemberState state(getGlobalReplicationCoordinator()->getCurrentMemberState());
- if (!state.recovering()) {
- return;
- }
-
- OpTime minvalid = getMinValid(txn);
- if (minvalid > getGlobalReplicationCoordinator()->getMyLastOptime()) {
- sethbmsg(str::stream() << "still syncing, not yet to minValid optime " <<
- minvalid.toString());
- return;
- }
-
- sethbmsg("");
- getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_SECONDARY);
- }
-
-
Status ReplSetImpl::forceSyncFrom(const string& host, BSONObjBuilder* result) {
lock lk(this);
@@ -155,9 +115,9 @@ namespace repl {
}
// record the previous member we were syncing from
- const Member *prev = BackgroundSync::get()->getSyncTarget();
- if (prev) {
- result->append("prevSyncTarget", prev->fullName());
+ const HostAndPort prev = BackgroundSync::get()->getSyncTarget();
+ if (!prev.empty()) {
+ result->append("prevSyncTarget", prev.toString());
}
// finally, set the new target
@@ -170,7 +130,8 @@ namespace repl {
return _forceSyncTarget != 0;
}
- bool ReplSetImpl::shouldChangeSyncTarget(const OpTime& targetOpTime) const {
+ bool ReplSetImpl::shouldChangeSyncTarget(const HostAndPort& currentTarget) const {
+ OpTime targetOpTime = findByName(currentTarget.toString())->hbinfo().opTime;
for (Member *m = _members.head(); m; m = m->next()) {
if (m->syncable() &&
targetOpTime.getSecs()+maxSyncSourceLagSecs < m->hbinfo().opTime.getSecs()) {
@@ -191,12 +152,7 @@ namespace repl {
sleepsecs(1);
return;
}
-
- bool initialSyncRequested = false;
- {
- boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex);
- initialSyncRequested = theReplSet->initialSyncRequested;
- }
+ bool initialSyncRequested = BackgroundSync::get()->getInitialSyncRequestedFlag();
// Check criteria for doing an initial sync:
// 1. If the oplog is empty, do an initial sync
// 2. If minValid has _initialSyncFlag set, do an initial sync
@@ -215,20 +171,9 @@ namespace repl {
}
bool ReplSetImpl::resync(OperationContext* txn, string& errmsg) {
- changeState(MemberState::RS_RECOVERING);
+ getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_STARTUP2);
+ BackgroundSync::get()->setInitialSyncRequestedFlag(true);
- WriteUnitOfWork wunit(txn);
- Client::Context ctx(txn, "local");
-
- ctx.db()->dropCollection(txn, "local.oplog.rs");
- {
- boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex);
- theReplSet->initialSyncRequested = true;
- }
- getGlobalReplicationCoordinator()->setMyLastOptime(txn, OpTime());
- _veto.clear();
-
- wunit.commit();
return true;
}
@@ -262,13 +207,6 @@ namespace repl {
}
void startSyncThread() {
- static int n;
- if( n != 0 ) {
- log() << "replSet ERROR : more than one sync thread?" << rsLog;
- verify( n == 0 );
- }
- n++;
-
Client::initThread("rsSync");
replLocalAuth();
theReplSet->syncThread();
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
index bfa7507c18b..81084cee2c7 100644
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/repl/rslog.h"
#include "mongo/db/operation_context.h"
#include "mongo/util/log.h"
+#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -54,8 +55,7 @@ namespace repl {
// used in replAuthenticate
static const BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}");
- SyncSourceFeedback::SyncSourceFeedback() : _syncTarget(NULL),
- _positionChanged(false),
+ SyncSourceFeedback::SyncSourceFeedback() : _positionChanged(false),
_handshakeNeeded(false),
_shutdownSignaled(false) {}
SyncSourceFeedback::~SyncSourceFeedback() {}
@@ -253,18 +253,18 @@ namespace repl {
_resetConnection();
continue;
}
- const Member* target = BackgroundSync::get()->getSyncTarget();
+ const HostAndPort target = BackgroundSync::get()->getSyncTarget();
if (_syncTarget != target) {
_resetConnection();
_syncTarget = target;
}
if (!hasConnection()) {
// fix connection if need be
- if (!target) {
+ if (target.empty()) {
sleepmillis(500);
continue;
}
- if (!_connect(&txn, target->h())) {
+ if (!_connect(&txn, target)) {
sleepmillis(500);
continue;
}
diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h
index fc5a33a8469..fc20aa20ee6 100644
--- a/src/mongo/db/repl/sync_source_feedback.h
+++ b/src/mongo/db/repl/sync_source_feedback.h
@@ -35,15 +35,13 @@
#include "mongo/client/constants.h"
#include "mongo/client/dbclientcursor.h"
+#include "mongo/util/net/hostandport.h"
namespace mongo {
-
class OperationContext;
namespace repl {
- class Member;
-
class SyncSourceFeedback {
public:
SyncSourceFeedback();
@@ -100,7 +98,7 @@ namespace repl {
/// TODO(spencer): Remove this once the LegacyReplicationCoordinator is gone.
BSONObj _me;
// the member we are currently syncing from
- const Member* _syncTarget;
+ HostAndPort _syncTarget;
// our connection to our sync target
boost::scoped_ptr<DBClientConnection> _connection;
// protects cond, _shutdownSignaled, and the indicator bools.
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index d87d475056f..65d345d3a1b 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -35,6 +35,7 @@
#include "third_party/murmurhash3/MurmurHash3.h"
#include "mongo/base/counter.h"
+#include "mongo/db/catalog/database.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/curop.h"
@@ -324,12 +325,20 @@ namespace {
// (always checked in the first iteration of this do-while loop, because
// ops is empty)
if (ops.empty() || now > lastTimeChecked) {
- {
- boost::unique_lock<boost::mutex> lock(theReplSet->initialSyncMutex);
- if (theReplSet->initialSyncRequested) {
- // got a resync command
- return;
- }
+ BackgroundSync* bgsync = BackgroundSync::get();
+ if (bgsync->getInitialSyncRequestedFlag()) {
+ // got a resync command
+ Lock::DBWrite lk(txn.lockState(), "local");
+ WriteUnitOfWork wunit(&txn);
+ Client::Context ctx(&txn, "local");
+
+ ctx.db()->dropCollection(&txn, "local.oplog.rs");
+ getGlobalReplicationCoordinator()->setMyLastOptime(&txn, OpTime());
+ theReplSet->_veto.clear();
+ bgsync->stop();
+ wunit.commit();
+
+ return;
}
lastTimeChecked = now;
// can we become secondary?
@@ -389,18 +398,8 @@ namespace {
OpTime minValid = lastOp["ts"]._opTime();
setMinValid(&txn, minValid);
- if (BackgroundSync::get()->isAssumingPrimary()) {
- LOG(1) << "about to apply batch up to optime: "
- << ops.getDeque().back()["ts"]._opTime().toStringPretty();
- }
-
multiApply(ops.getDeque());
- if (BackgroundSync::get()->isAssumingPrimary()) {
- LOG(1) << "about to update oplog to optime: "
- << ops.getDeque().back()["ts"]._opTime().toStringPretty();
- }
-
applyOpsToOplog(&ops.getDeque());
// If we're just testing (no manager), don't keep looping if we exhausted the bgqueue
@@ -494,12 +493,8 @@ namespace {
wunit.commit();
}
- if (BackgroundSync::get()->isAssumingPrimary()) {
- LOG(1) << "notifying BackgroundSync";
- }
-
// Update write concern on primary
- BackgroundSync::notify();
+ BackgroundSync::get()->notify();
return lastOpTime;
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 6729c4d14f3..ee76f5c89b8 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -39,7 +39,6 @@ namespace mongo {
class OperationContext;
namespace repl {
-
class BackgroundSyncInterface;
/**
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index 0f7e5e2c076..b5a90386e1b 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -253,7 +253,7 @@ namespace {
return _syncSource;
}
_syncSource = _currentConfig.getMemberAt(closestIndex).getHostAndPort();
- std::string msg(str::stream() << "syncing to: " << _syncSource.toString(), 0);
+ std::string msg(str::stream() << "syncing from: " << _syncSource.toString(), 0);
_sethbmsg(msg);
log() << msg;
return _syncSource;
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index fc1ee065097..d528a4f8605 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -2625,7 +2625,7 @@ namespace {
ASSERT_EQUALS(OpTime(0,0), response.getOpTime());
ASSERT_EQUALS(Seconds(0).total_milliseconds(), response.getTime().total_milliseconds());
// changed to a syncing message because our sync source changed recently
- ASSERT_EQUALS("syncing to: h2:27017", response.getHbMsg());
+ ASSERT_EQUALS("syncing from: h2:27017", response.getHbMsg());
ASSERT_EQUALS("rs0", response.getReplicaSetName());
ASSERT_EQUALS(1, response.getVersion());
ASSERT_EQUALS(HostAndPort("h2").toString(), response.getSyncingTo());