summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2014-09-08 11:37:42 -0400
committerEric Milkie <milkie@10gen.com>2014-09-12 10:39:11 -0400
commitff1ee391747092e2d03765402c6ab25ba7e1d538 (patch)
treed0650ad040b4b63ed75de9a0d5a349558dd8658f /src/mongo/db/repl/bgsync.cpp
parentce737ebed71bc4485180b86832e907d820858664 (diff)
downloadmongo-ff1ee391747092e2d03765402c6ab25ba7e1d538.tar.gz
SERVER-15089 chooseNewSyncSource hooked up in replication Applier
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp155
1 files changed, 122 insertions, 33 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 0e3ad68352b..1a941483984 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -30,20 +30,23 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/repl/bgsync.h"
+
+#include "mongo/base/counter.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
-#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_coordinator_global.h"
-#include "mongo/db/repl/rs_sync.h"
+#include "mongo/db/repl/repl_coordinator_impl.h"
#include "mongo/db/repl/rs.h"
+#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/rslog.h"
+#include "mongo/db/stats/timer_stats.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
-#include "mongo/base/counter.h"
-#include "mongo/db/stats/timer_stats.h"
namespace mongo {
@@ -98,7 +101,8 @@ namespace repl {
_pause(true),
_appliedBuffer(true),
_assumingPrimary(false),
- _currentSyncTarget(NULL) {
+ _currentSyncTarget(NULL),
+ _replCoord(getGlobalReplicationCoordinator()) {
}
BackgroundSync* BackgroundSync::get() {
@@ -188,14 +192,37 @@ namespace repl {
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
OplogReader r;
- OpTime lastOpTimeFetched;
- // find a target to sync from the last op time written
- getOplogReader(txn, r);
- // no server found
{
boost::unique_lock<boost::mutex> lock(_mutex);
+ if (_lastOpTimeFetched.isNull()) {
+ // then we're initial syncing and we're still waiting for this to be set
+ _currentSyncTarget = NULL;
+ lock.unlock();
+ sleepsecs(1);
+ // if there is no one to sync from
+ return;
+ }
+ // Wait until we've applied the ops we have before we choose a sync target
+ while (!_appliedBuffer) {
+ _condvar.wait(lock);
+ }
+ }
+
+ while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
+ sleepmillis(0);
+ }
+
+
+ // find a target to sync from the last op time written
+ _replCoord->connectOplogReader(txn, this, &r);
+
+ OpTime lastOpTimeFetched;
+
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ // no server found
if (_currentSyncTarget == NULL) {
lock.unlock();
sleepsecs(1);
@@ -364,43 +391,25 @@ namespace repl {
return true;
}
- void BackgroundSync::getOplogReader(OperationContext* txn, OplogReader& r) {
+ void BackgroundSync::getOplogReaderLegacy(OperationContext* txn, OplogReader* r) {
const Member *target = NULL, *stale = NULL;
BSONObj oldest;
- {
- boost::unique_lock<boost::mutex> lock(_mutex);
- if (_lastOpTimeFetched.isNull()) {
- // then we're initial syncing and we're still waiting for this to be set
- _currentSyncTarget = NULL;
- return;
- }
-
- // Wait until we've applied the ops we have before we choose a sync target
- while (!_appliedBuffer) {
- _condvar.wait(lock);
- }
- }
-
- while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
- sleepmillis(0);
- }
-
- verify(r.conn() == NULL);
+ verify(r->conn() == NULL);
while ((target = theReplSet->getMemberToSyncTo()) != NULL) {
string current = target->fullName();
- if (!r.connect(target->h())) {
+ if (!r->connect(target->h())) {
LOG(2) << "replSet can't connect to " << current << " to read operations" << rsLog;
- r.resetConnection();
+ r->resetConnection();
theReplSet->veto(current);
sleepsecs(1);
continue;
}
- if (isStale(r, oldest)) {
- r.resetConnection();
+ if (isStale(*r, oldest)) {
+ r->resetConnection();
theReplSet->veto(current, 600);
stale = target;
continue;
@@ -426,6 +435,86 @@ namespace repl {
boost::unique_lock<boost::mutex> lock(_mutex);
_currentSyncTarget = NULL;
}
+
+ }
+
+ void BackgroundSync::connectOplogReader(OperationContext* txn,
+ ReplicationCoordinatorImpl* replCoordImpl,
+ OplogReader* reader) {
+ OpTime lastOpTimeFetched;
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ lastOpTimeFetched = _lastOpTimeFetched;
+ }
+ Date_t now(curTimeMillis64());
+ OpTime oldestOpTimeSeen(now,0);
+
+ while (true) {
+ HostAndPort candidate = replCoordImpl->chooseNewSyncSource();
+
+ if (candidate.empty()) {
+ if (oldestOpTimeSeen == OpTime(now,0)) {
+ // If, in this invocation of connectOplogReader(), we did not successfully
+ // connect to any node ahead of us,
+ // we apparently have no sync sources to connect to.
+ // This situation is common; e.g. if there are no writes to the primary at
+ // the moment.
+ return;
+ }
+
+ // Connected to at least one member, but in all cases we were too stale to use them
+ // as a sync source.
+ log() << "replSet error RS102 too stale to catch up" << rsLog;
+ log() << "replSet our last optime : " << lastOpTimeFetched.toStringLong() << rsLog;
+ log() << "replSet oldest available is " << oldestOpTimeSeen.toStringLong() <<
+ rsLog;
+ log() << "replSet "
+ "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"
+ << rsLog;
+ sethbmsg("error RS102 too stale to catch up");
+ theReplSet->setMinValid(txn, oldestOpTimeSeen);
+ replCoordImpl->setFollowerMode(MemberState::RS_RECOVERING);
+ return;
+ }
+
+ if (!reader->connect(candidate)) {
+ LOG(2) << "replSet can't connect to " << candidate.toString() <<
+ " to read operations" << rsLog;
+ reader->resetConnection();
+ replCoordImpl->blacklistSyncSource(candidate, Date_t(curTimeMillis64() + 10*1000));
+ continue;
+ }
+ // Read the first (oldest) op and confirm that it's not newer than our last
+ // fetched op. Otherwise, we have fallen off the back of that source's oplog.
+ BSONObj remoteOldestOp(reader->findOne(rsoplog, Query()));
+ BSONElement tsElem(remoteOldestOp["ts"]);
+ if (tsElem.type() != Timestamp) {
+ // This member's got a bad op in its oplog.
+ warning() << "oplog invalid format on node " << candidate.toString();
+ reader->resetConnection();
+ replCoordImpl->blacklistSyncSource(candidate,
+ Date_t(curTimeMillis64() + 600*1000));
+ continue;
+ }
+ OpTime remoteOldOpTime = tsElem._opTime();
+
+ if (lastOpTimeFetched < remoteOldOpTime) {
+ // We're too stale to use this sync source.
+ reader->resetConnection();
+ replCoordImpl->blacklistSyncSource(candidate,
+ Date_t(curTimeMillis64() + 600*1000));
+ if (oldestOpTimeSeen > remoteOldOpTime) {
+ warning() << "we are too stale to use " << candidate.toString() <<
+ " as a sync source";
+ oldestOpTimeSeen = remoteOldOpTime;
+ }
+ continue;
+ }
+
+ // Got a valid sync source.
+ return;
+ } // while (true)
+
}
bool BackgroundSync::isRollbackRequired(OperationContext* txn, OplogReader& r) {