summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/bgsync.h')
-rw-r--r--src/mongo/db/repl/bgsync.h62
1 files changed, 30 insertions, 32 deletions
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index eb88040f64c..45f87731200 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -39,7 +39,6 @@ namespace repl {
class Member;
class ReplicationCoordinator;
- class ReplicationCoordinatorImpl;
// This interface exists to facilitate easier testing;
// the test infrastructure implements these functions with stubs.
@@ -56,12 +55,6 @@ namespace repl {
// called by sync thread after it has applied an op
virtual void consume() = 0;
- // Returns the member we're currently syncing from (or NULL)
- virtual const Member* getSyncTarget() = 0;
-
- // Sets the member we're currently syncing from to be NULL
- virtual void clearSyncTarget() = 0;
-
// wait up to 1 second for more ops to appear
virtual void waitForMore() = 0;
};
@@ -79,21 +72,26 @@ namespace repl {
// protects creation of s_instance
static boost::mutex s_mutex;
- // _mutex protects all of the class variables
- boost::mutex _mutex;
-
// Production thread
BlockingQueue<BSONObj> _buffer;
+ OplogReader _syncSourceReader;
+
+ // _mutex protects all of the class variables except _syncSourceReader and _buffer
+ mutable boost::mutex _mutex;
OpTime _lastOpTimeFetched;
- long long _lastH;
+
+ // hash we use to make sure we are reading the right flow of ops and aren't on
+ // an out-of-date "fork"
+ long long _lastHash;
+
// if produce thread should be running
bool _pause;
bool _appliedBuffer;
bool _assumingPrimary;
boost::condition _condvar;
- const Member* _currentSyncTarget;
+ HostAndPort _syncSourceHost;
BackgroundSync();
BackgroundSync(const BackgroundSync& s);
@@ -107,23 +105,28 @@ namespace repl {
bool _rollbackIfNeeded(OperationContext* txn, OplogReader& r);
// Evaluate if the current sync target is still good
- bool shouldChangeSyncTarget();
+ bool shouldChangeSyncSource();
// check lastOpTimeWritten against the remote's earliest op, filling in remoteOldestOp.
- bool isStale(OplogReader& r, BSONObj& remoteOldestOp);
- // stop syncing when this becomes a primary
- void stop();
+ bool isStale(OpTime lastOpTimeFetched, OplogReader& r, BSONObj& remoteOldestOp);
// restart syncing
- void start();
+ void start(OperationContext* txn);
// A pointer to the replication coordinator running the show.
ReplicationCoordinator* _replCoord;
+ // bool for indicating resync need on this node and the mutex that protects it
+ // The resync command sets this flag; the Applier thread observes and clears it.
+ bool _initialSyncRequestedFlag;
+ boost::mutex _initialSyncMutex;
+
public:
+ // stop syncing (when this node becomes a primary, e.g.)
+ void stop();
bool isAssumingPrimary();
static BackgroundSync* get();
- static void shutdown();
- static void notify();
+ void shutdown();
+ void notify();
virtual ~BackgroundSync() {}
@@ -132,11 +135,12 @@ namespace repl {
// starts the sync target notifying thread
void notifierThread();
+ HostAndPort getSyncTarget();
+
// Interface implementation
virtual bool peek(BSONObj* op);
virtual void consume();
- virtual const Member* getSyncTarget();
virtual void clearSyncTarget();
virtual void waitForMore();
@@ -147,18 +151,12 @@ namespace repl {
// primary.
void stopReplicationAndFlushBuffer();
- /**
- * Connects an oplog reader to a viable sync source. Legacy uses getOplogReaderLegacy(),
- * which sets _currentSyncTarget as a side effect.
- * connectOplogReader() is used in new replication.
- * Both functions can affect the TopoCoord's blacklist of sync sources, and may set
- * our minValid value, durably, if we detect we haven fallen off the back of all sync
- * sources' oplogs.
- **/
- void getOplogReaderLegacy(OperationContext* txn, OplogReader* reader);
- void connectOplogReader(OperationContext* txn,
- ReplicationCoordinatorImpl* replCoordImpl,
- OplogReader* reader);
+ long long getLastHash() const;
+ void setLastHash(long long oldH);
+ void loadLastHash(OperationContext* txn);
+
+ bool getInitialSyncRequestedFlag();
+ void setInitialSyncRequestedFlag(bool value);
};