summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/bgsync.cpp64
-rw-r--r--src/mongo/db/repl/bgsync.h12
-rw-r--r--src/mongo/db/repl/heartbeat.cpp7
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp10
4 files changed, 15 insertions, 78 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 2db047dde95..0e68364ae11 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -91,8 +91,7 @@ namespace replset {
_pause(true),
_appliedBuffer(true),
_assumingPrimary(false),
- _currentSyncTarget(NULL),
- _consumedOpTime(0, 0) {
+ _currentSyncTarget(NULL) {
}
BackgroundSync* BackgroundSync::get() {
@@ -108,17 +107,7 @@ namespace replset {
}
void BackgroundSync::notify() {
- {
- boost::unique_lock<boost::mutex> lock(s_mutex);
- if (s_instance == NULL) {
- return;
- }
- }
-
- {
- boost::unique_lock<boost::mutex> opLock(s_instance->_lastOpMutex);
- s_instance->_lastOpCond.notify_all();
- }
+ theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten);
{
boost::unique_lock<boost::mutex> lock(s_instance->_mutex);
@@ -131,55 +120,6 @@ namespace replset {
}
}
- void BackgroundSync::notifierThread() {
- Client::initThread("rsSyncNotifier");
- replLocalAuth();
-
- theReplSet->syncSourceFeedback.go();
-
- while (!inShutdown()) {
- if (!theReplSet) {
- sleepsecs(5);
- continue;
- }
-
- MemberState state = theReplSet->state();
- if (state.primary() || state.fatal() || state.startup()) {
- sleepsecs(5);
- continue;
- }
-
- try {
- {
- boost::unique_lock<boost::mutex> lock(_lastOpMutex);
- while (_consumedOpTime == theReplSet->lastOpTimeWritten) {
- _lastOpCond.wait(lock);
- }
- }
-
- markOplog();
- }
- catch (DBException &e) {
- log() << "replset tracking exception: " << e.getInfo() << rsLog;
- sleepsecs(1);
- }
- catch (std::exception &e2) {
- log() << "replset tracking error" << e2.what() << rsLog;
- sleepsecs(1);
- }
- }
-
- cc().shutdown();
- }
-
- void BackgroundSync::markOplog() {
- LOG(3) << "replset markOplog: " << _consumedOpTime << " "
- << theReplSet->lastOpTimeWritten << rsLog;
-
- _consumedOpTime = theReplSet->lastOpTimeWritten;
- theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten);
- }
-
void BackgroundSync::producerThread() {
Client::initThread("rsBackgroundSync");
replLocalAuth();
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 2579e54792d..dd2fed481ad 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -91,14 +91,6 @@ namespace replset {
const Member* _currentSyncTarget;
- // Notifier thread
-
- // used to wait until another op has been replicated
- boost::condition_variable _lastOpCond;
- boost::mutex _lastOpMutex;
-
- OpTime _consumedOpTime; // not locked, only used by notifier thread
-
BackgroundSync();
BackgroundSync(const BackgroundSync& s);
BackgroundSync operator=(const BackgroundSync& s);
@@ -119,10 +111,6 @@ namespace replset {
// restart syncing
void start();
- // Tracker thread
- // tells the sync target where this member is synced to
- void markOplog();
-
bool isAssumingPrimary();
public:
diff --git a/src/mongo/db/repl/heartbeat.cpp b/src/mongo/db/repl/heartbeat.cpp
index e0f633c8adb..163c4167f47 100644
--- a/src/mongo/db/repl/heartbeat.cpp
+++ b/src/mongo/db/repl/heartbeat.cpp
@@ -248,11 +248,14 @@ namespace mongo {
return;
}
+ // this ensures that will have bgsync's s_instance at all points where it is needed
+ // so that we needn't check for its existence
+ replset::BackgroundSync* sync = replset::BackgroundSync::get();
+
boost::thread t(startSyncThread);
- replset::BackgroundSync* sync = replset::BackgroundSync::get();
boost::thread producer(boost::bind(&replset::BackgroundSync::producerThread, sync));
- boost::thread notifier(boost::bind(&replset::BackgroundSync::notifierThread, sync));
+ theReplSet->syncSourceFeedback.go();
task::fork(ghost);
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
index 6dbf2f773c2..263725c26e6 100644
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -239,7 +239,11 @@ namespace mongo {
void SyncSourceFeedback::run() {
Client::initThread("SyncSourceFeedbackThread");
bool sleepNeeded = false;
- while (true) {
+ while (!inShutdown()) {
+ if (!theReplSet) {
+ sleepsecs(5);
+ continue;
+ }
if (sleepNeeded) {
sleepmillis(500);
sleepNeeded = false;
@@ -249,7 +253,8 @@ namespace mongo {
while (!_positionChanged && !_handshakeNeeded) {
_cond.wait(lock);
}
- if (theReplSet->isPrimary()) {
+ MemberState state = theReplSet->state();
+ if (state.primary() || state.fatal() || state.startup()) {
_positionChanged = false;
_handshakeNeeded = false;
continue;
@@ -291,5 +296,6 @@ namespace mongo {
}
}
}
+ cc().shutdown();
}
}