summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2013-07-12 14:24:08 -0400
committerEric Milkie <milkie@10gen.com>2013-07-12 15:14:09 -0400
commit6486b4035c5ac52679eb3e1a034c925ccdd20deb (patch)
treec74c2fe36c819534e7444cdbe1aa12f533664304 /src/mongo
parentac386485a9b9f141a518f2aca1a2b66b8f976104 (diff)
downloadmongo-6486b4035c5ac52679eb3e1a034c925ccdd20deb.tar.gz
Revert "SERVER-6071 use command on local.slaves instead of cursor"
This reverts commit 2267744af0e972bceccb4ff4e9ed19a1ed639d2e. Revert "SERVER-6071 correct assert code" This reverts commit 85ca38c33e4aaebad539e78a05aed329b09c1208. Conflicts: src/mongo/db/repl/sync_source_feedback.cpp Revert "SERVER-6071 use command on local.slaves instead of cursor" This reverts commit 83ecb9775b7997dd5115c53f6ea30d2e368a4244. Conflicts: src/mongo/db/repl/replset_commands.cpp src/mongo/db/repl/sync_source_feedback.cpp
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/auth/action_types.txt1
-rw-r--r--src/mongo/db/repl/bgsync.cpp68
-rw-r--r--src/mongo/db/repl/bgsync.h1
-rw-r--r--src/mongo/db/repl/health.cpp9
-rw-r--r--src/mongo/db/repl/oplogreader.cpp2
-rw-r--r--src/mongo/db/repl/oplogreader.h3
-rw-r--r--src/mongo/db/repl/replset_commands.cpp39
-rw-r--r--src/mongo/db/repl/rs.cpp7
-rw-r--r--src/mongo/db/repl/rs.h4
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp310
-rw-r--r--src/mongo/db/repl/sync_source_feedback.h171
-rw-r--r--src/mongo/db/repl/write_concern.cpp46
-rw-r--r--src/mongo/db/repl/write_concern.h2
14 files changed, 45 insertions, 619 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 464bc81b720..d61190c3188 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -456,7 +456,6 @@ serverOnlyFiles = [ "db/curop.cpp",
"db/repl/master_slave.cpp",
"db/repl/finding_start_cursor.cpp",
"db/repl/sync.cpp",
- "db/repl/sync_source_feedback.cpp",
"db/repl/optime.cpp",
"db/repl/oplogreader.cpp",
"db/repl/replication_server_status.cpp",
diff --git a/src/mongo/db/auth/action_types.txt b/src/mongo/db/auth/action_types.txt
index bebec04b78e..057084293a3 100644
--- a/src/mongo/db/auth/action_types.txt
+++ b/src/mongo/db/auth/action_types.txt
@@ -71,7 +71,6 @@
"replSetReconfig",
"replSetStepDown",
"replSetSyncFrom",
-"replSetUpdatePosition",
"resync",
"serverStatus",
"setParameter",
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index f453664262d..6e9e7538f5a 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -22,7 +22,6 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/rs_sync.h"
-#include "mongo/db/repl/rs.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/base/counter.h"
#include "mongo/db/stats/timer_stats.h"
@@ -80,6 +79,7 @@ namespace replset {
_assumingPrimary(false),
_currentSyncTarget(NULL),
_oplogMarkerTarget(NULL),
+ _oplogMarker(true /* doHandshake */),
_consumedOpTime(0, 0) {
}
@@ -122,7 +122,6 @@ namespace replset {
void BackgroundSync::notifierThread() {
Client::initThread("rsSyncNotifier");
replLocalAuth();
- theReplSet->syncSourceFeedback.go();
while (!inShutdown()) {
bool clearTarget = false;
@@ -169,44 +168,37 @@ namespace replset {
}
void BackgroundSync::markOplog() {
- LOG(3) << "replset markOplog: " << _consumedOpTime << " "
- << theReplSet->lastOpTimeWritten << rsLog;
+ LOG(3) << "replset markOplog: " << _consumedOpTime << " " << theReplSet->lastOpTimeWritten << rsLog;
- if (theReplSet->syncSourceFeedback.supportsUpdater()) {
- theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten);
- _consumedOpTime = theReplSet->lastOpTimeWritten;
+ if (!hasCursor()) {
+ sleepsecs(1);
+ return;
}
- else {
- if (!hasCursor()) {
- return;
- }
- if (!theReplSet->syncSourceFeedback.moreInCurrentBatch()) {
- theReplSet->syncSourceFeedback.more();
- }
-
- if (!theReplSet->syncSourceFeedback.more()) {
- theReplSet->syncSourceFeedback.tailCheck();
- return;
- }
+ if (!_oplogMarker.moreInCurrentBatch()) {
+ _oplogMarker.more();
+ }
- // if this member has written the op at optime T
- // we want to nextSafe up to and including T
- while (_consumedOpTime < theReplSet->lastOpTimeWritten
- && theReplSet->syncSourceFeedback.more()) {
- BSONObj temp = theReplSet->syncSourceFeedback.nextSafe();
- _consumedOpTime = temp["ts"]._opTime();
- }
+ if (!_oplogMarker.more()) {
+ _oplogMarker.tailCheck();
+ sleepsecs(1);
+ return;
+ }
- // call more() to signal the sync target that we've synced T
- theReplSet->syncSourceFeedback.more();
+ // if this member has written the op at optime T, we want to nextSafe up to and including T
+ while (_consumedOpTime < theReplSet->lastOpTimeWritten && _oplogMarker.more()) {
+ BSONObj temp = _oplogMarker.nextSafe();
+ _consumedOpTime = temp["ts"]._opTime();
}
+
+ // call more() to signal the sync target that we've synced T
+ _oplogMarker.more();
}
bool BackgroundSync::hasCursor() {
{
// prevent writers from blocking readers during fsync
- SimpleMutex::scoped_lock fsynclk(filesLockedFsync);
+ SimpleMutex::scoped_lock fsynclk(filesLockedFsync);
// we don't need the local write lock yet, but it's needed by OplogReader::connect
// so we take it preemptively to avoid deadlocking.
Lock::DBWrite lk("local");
@@ -218,23 +210,25 @@ namespace replset {
return false;
}
- log() << "replset setting oplog notifier to "
- << _currentSyncTarget->fullName() << rsLog;
+ log() << "replset setting oplog notifier to " << _currentSyncTarget->fullName() << rsLog;
_oplogMarkerTarget = _currentSyncTarget;
- if (!theReplSet->syncSourceFeedback.connect(_oplogMarkerTarget)) {
+ _oplogMarker.resetConnection();
+
+ if (!_oplogMarker.connect(_oplogMarkerTarget->fullName())) {
+ LOG(1) << "replset could not connect to " << _oplogMarkerTarget->fullName() << rsLog;
_oplogMarkerTarget = NULL;
return false;
}
}
}
- if (!theReplSet->syncSourceFeedback.haveCursor()) {
+
+ if (!_oplogMarker.haveCursor()) {
BSONObj fields = BSON("ts" << 1);
- theReplSet->syncSourceFeedback.tailingQueryGTE(rsoplog,
- theReplSet->lastOpTimeWritten, &fields);
+ _oplogMarker.tailingQueryGTE(rsoplog, theReplSet->lastOpTimeWritten, &fields);
}
- return theReplSet->syncSourceFeedback.haveCursor();
+ return _oplogMarker.haveCursor();
}
void BackgroundSync::producerThread() {
@@ -531,8 +525,6 @@ namespace replset {
_currentSyncTarget = target;
}
- theReplSet->syncSourceFeedback.connect(target);
-
return;
}
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 02d359cfee5..1f4cb3b1530 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -86,6 +86,7 @@ namespace replset {
boost::mutex _lastOpMutex;
const Member* _oplogMarkerTarget;
+ OplogReader _oplogMarker; // not locked, only used by notifier thread
OpTime _consumedOpTime; // not locked, only used by notifier thread
BackgroundSync();
diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp
index 297ebde93f5..3a2681128f8 100644
--- a/src/mongo/db/repl/health.cpp
+++ b/src/mongo/db/repl/health.cpp
@@ -348,15 +348,6 @@ namespace mongo {
return 0;
}
- Member* ReplSetImpl::getMutableMember(unsigned id) {
- if( _self && id == _self->id() ) return _self;
-
- for( Member *m = head(); m; m = m->next() )
- if( m->id() == id )
- return m;
- return 0;
- }
-
Member* ReplSetImpl::findByName(const std::string& hostname) const {
if (_self && hostname == _self->fullName()) {
return _self;
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index ea55ada5c11..5ee5a4628b4 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -140,7 +140,7 @@ namespace mongo {
if( conn() == 0 ) {
_conn = shared_ptr<DBClientConnection>(new DBClientConnection(false,
0,
- tcp_timeout));
+ 30 /* tcp timeout */));
string errmsg;
if ( !_conn->connect(hostName.c_str(), errmsg) ||
(AuthorizationManager::isAuthEnabled() && !replAuthenticate(_conn.get(), true)) ) {
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 345dc2be9ee..801b9a4854e 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -50,9 +50,6 @@ namespace mongo {
return findOne(ns, Query().sort(reverseNaturalObj));
}
- /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */
- static const int tcp_timeout = 30;
-
/* ok to call if already connected */
bool connect(const std::string& hostname);
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index b20cfa8e34b..e8de0043ea8 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -24,7 +24,6 @@
#include "mongo/db/cmdline.h"
#include "mongo/db/commands.h"
#include "mongo/db/repl/health.h"
-#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_server_status.h" // replSettings
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/rs_config.h"
@@ -392,42 +391,4 @@ namespace mongo {
}
} cmdReplSetSyncFrom;
- class CmdReplSetUpdatePosition: public ReplSetCommand {
- public:
- virtual void help( stringstream &help ) const {
- help << "internal";
- }
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::replSetUpdatePosition);
- out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions));
- }
- CmdReplSetUpdatePosition() : ReplSetCommand("replSetUpdatePosition") { }
- virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg,
- BSONObjBuilder& result, bool fromRepl) {
- if (!check(errmsg, result))
- return false;
-
- if (cmdObj.hasField("handshake")) {
- // we have received a handshake, not an update message
- // handshakes are done here to ensure the receiving end supports the update command
- cc().gotHandshake(cmdObj["handshake"].embeddedObject());
- // if we aren't primary, pass the handshake along
- if (!theReplSet->isPrimary() && theReplSet->syncSourceFeedback.supportsUpdater()) {
- theReplSet->syncSourceFeedback.forwardSlaveHandshake();
- }
- return true;
- }
-
- uassert(16888, "optimes field should be an array with an object for each secondary",
- cmdObj["optimes"].type() == Array);
- BSONArray newTimes = BSONArray(cmdObj["optimes"].Obj());
- updateSlaveLocations(newTimes);
-
- return true;
- }
- } cmdReplSetUpdatePosition;
-
}
diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp
index e0c58423611..0f539601485 100644
--- a/src/mongo/db/repl/rs.cpp
+++ b/src/mongo/db/repl/rs.cpp
@@ -925,11 +925,8 @@ namespace mongo {
void ReplSetImpl::registerSlave(const BSONObj& rid, const int memberId) {
// To prevent race conditions with clearing the cache at reconfig time,
// we lock the replset mutex here.
- {
- lock lk(this);
- ghost->associateSlave(rid, memberId);
- }
- syncSourceFeedback.associateMember(rid, memberId);
+ lock lk(this);
+ ghost->associateSlave(rid, memberId);
}
class ReplIndexPrefetch : public ServerParameter {
diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h
index 74d36edbb23..4190ded55c1 100644
--- a/src/mongo/db/repl/rs.h
+++ b/src/mongo/db/repl/rs.h
@@ -26,7 +26,6 @@
#include "mongo/db/repl/rs_exception.h"
#include "mongo/db/repl/rs_member.h"
#include "mongo/db/repl/rs_sync.h"
-#include "mongo/db/repl/sync_source_feedback.h"
#include "mongo/util/concurrency/list.h"
#include "mongo/util/concurrency/msg.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -342,8 +341,6 @@ namespace mongo {
StateBox box;
- SyncSourceFeedback syncSourceFeedback;
-
OpTime lastOpTimeWritten;
long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork"
bool forceSyncFrom(const string& host, string& errmsg, BSONObjBuilder& result);
@@ -508,7 +505,6 @@ namespace mongo {
Member* head() const { return _members.head(); }
public:
const Member* findById(unsigned id) const;
- Member* getMutableMember(unsigned id);
Member* findByName(const std::string& hostname) const;
private:
void _getTargets(list<Target>&, int &configVersion);
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
deleted file mode 100644
index 21a7bffd3c4..00000000000
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
-* Copyright (C) 2013 10gen Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "mongo/db/repl/sync_source_feedback.h"
-
-#include "mongo/client/constants.h"
-#include "mongo/client/dbclientcursor.h"
-#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/auth/security_key.h"
-#include "mongo/db/dbhelpers.h"
-#include "mongo/db/repl/bgsync.h"
-#include "mongo/db/repl/rs.h" // theReplSet
-
-namespace mongo {
-
- // used in replAuthenticate
- static const BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}");
-
- void SyncSourceFeedback::associateMember(const BSONObj& id, const int memberId) {
- const OID rid = id["_id"].OID();
- boost::unique_lock<boost::mutex> lock(_mtx);
- _handshakeNeeded = true;
- _members[rid] = theReplSet->getMutableMember(memberId);
- _cond.notify_all();
- }
-
- bool SyncSourceFeedback::replAuthenticate(bool skipAuthCheck) {
-
- if (!AuthorizationManager::isAuthEnabled()) {
- return true;
- }
- if (!skipAuthCheck && !cc().getAuthorizationSession()->hasInternalAuthorization()) {
- log() << "replauthenticate: requires internal authorization, failing" << endl;
- return false;
- }
-
- if (isInternalAuthSet()) {
- return authenticateInternalUser(_connection.get());
- }
-
- BSONObj user;
- {
- Client::ReadContext ctxt("local.");
- if(!Helpers::findOne("local.system.users", userReplQuery, user) ||
- // try the first user in local
- !Helpers::getSingleton("local.system.users", user)) {
- log() << "replauthenticate: no user in local.system.users to use"
- << "for authentication" << endl;
- return false;
- }
- }
- std::string u = user.getStringField("user");
- std::string p = user.getStringField("pwd");
- massert(16889, "bad user object? [1]", !u.empty());
- massert(16887, "bad user object? [2]", !p.empty());
-
- std::string err;
-
- if( !_connection->auth("local", u.c_str(), p.c_str(), err, false) ) {
- log() << "replauthenticate: can't authenticate to master server, user:" << u << endl;
- return false;
- }
-
- return true;
- }
-
- void SyncSourceFeedback::ensureMe() {
- string myname = getHostName();
- {
- Client::WriteContext ctx("local");
- // local.me is an identifier for a server for getLastError w:2+
- if (!Helpers::getSingleton("local.me", _me) ||
- !_me.hasField("host") ||
- _me["host"].String() != myname) {
-
- // clean out local.me
- Helpers::emptyCollection("local.me");
-
- // repopulate
- BSONObjBuilder b;
- b.appendOID("_id", 0, true);
- b.append("host", myname);
- _me = b.obj();
- Helpers::putSingleton("local.me", _me);
- }
- }
- }
-
- bool SyncSourceFeedback::replHandshake() {
- ensureMe();
-
- // handshake for us
- BSONObjBuilder cmd;
- cmd.append("replSetUpdatePosition", 1);
- BSONObjBuilder sub (cmd.subobjStart("handshake"));
- sub.appendAs(_me["_id"], "handshake");
- sub.append("member", theReplSet->selfId());
- sub.append("config", theReplSet->myConfig().asBson());
- sub.doneFast();
-
- BSONObj res;
- try {
- if (!_connection->runCommand("admin", cmd.obj(), res)) {
- if (res["errmsg"].str().find("no such cmd") != std::string::npos) {
- _supportsUpdater = false;
- }
- resetConnection();
- return false;
- }
- else {
- _supportsUpdater = true;
- }
- }
- catch (const DBException& e) {
- log() << "SyncSourceFeedback error sending handshake: " << e.what() << endl;
- resetConnection();
- return false;
- }
-
- // handshakes for those connected to us
- {
- for (OIDMemberMap::iterator itr = _members.begin();
- itr != _members.end(); ++itr) {
- BSONObjBuilder slaveCmd;
- slaveCmd.append("replSetUpdatePosition", 1);
- // outer handshake indicates this is a handshake command
- // inner is needed as part of the structure to be passed to gotHandshake
- BSONObjBuilder slaveSub (slaveCmd.subobjStart("handshake"));
- slaveSub.append("handshake", itr->first);
- slaveSub.append("member", itr->second->id());
- slaveSub.append("config", itr->second->config().asBson());
- slaveSub.doneFast();
- BSONObj slaveRes;
- try {
- if (!_connection->runCommand("admin", slaveCmd.obj(), slaveRes)) {
- resetConnection();
- return false;
- }
- }
- catch (const DBException& e) {
- log() << "SyncSourceFeedback error sending chained handshakes: "
- << e.what() << endl;
- resetConnection();
- return false;
- }
- }
- }
- return true;
- }
-
- bool SyncSourceFeedback::_connect(const std::string& hostName) {
- if (hasConnection()) {
- return true;
- }
- _connection.reset(new DBClientConnection(false, 0, OplogReader::tcp_timeout));
- string errmsg;
- if (!_connection->connect(hostName.c_str(), errmsg) ||
- (AuthorizationManager::isAuthEnabled() && !replAuthenticate(true))) {
- resetConnection();
- log() << "repl: " << errmsg << endl;
- return false;
- }
-
- if (!replHandshake()) {
- if (!supportsUpdater()) {
- return connectOplogReader(hostName);
- }
- return false;
- }
- return true;
- }
-
- bool SyncSourceFeedback::connect(const Member* target) {
- boost::unique_lock<boost::mutex> lock(_connmtx);
- resetConnection();
- resetOplogReaderConnection();
- _syncTarget = target;
- if (_connect(target->fullName())) {
- if (!supportsUpdater()) {
- return true;
- }
- }
- return false;
- }
-
- void SyncSourceFeedback::forwardSlaveHandshake() {
- boost::unique_lock<boost::mutex> lock(_mtx);
- _handshakeNeeded = true;
- }
-
- void SyncSourceFeedback::updateMap(const mongo::OID& rid, const OpTime& ot) {
- boost::unique_lock<boost::mutex> lock(_mtx);
- LOG(1) << "replSet last: " << _slaveMap[rid].toString() << " to " << ot.toString() << endl;
- // only update if ot is newer than what we have already
- if (ot > _slaveMap[rid]) {
- _slaveMap[rid] = ot;
- _positionChanged = true;
- LOG(2) << "now last is " << _slaveMap[rid].toString() << endl;
- _cond.notify_all();
- }
- }
-
- bool SyncSourceFeedback::updateUpstream() {
- if (theReplSet->isPrimary()) {
- // primary has no one to update to
- return true;
- }
- BSONObjBuilder cmd;
- cmd.append("replSetUpdatePosition", 1);
- // create an array containing objects each member connected to us and for ourself
- BSONArrayBuilder array (cmd.subarrayStart("optimes"));
- OID myID = _me["_id"].OID();
- {
- for (map<mongo::OID, OpTime>::const_iterator itr = _slaveMap.begin();
- itr != _slaveMap.end(); ++itr) {
- BSONObjBuilder entry(array.subobjStart());
- entry.append("_id", itr->first);
- entry.append("optime", itr->second);
- if (itr->first == myID) {
- entry.append("config", theReplSet->myConfig().asBson());
- }
- else {
- entry.append("config", _members[itr->first]->config().asBson());
- }
- entry.doneFast();
- }
- }
- array.done();
- BSONObj res;
-
- bool ok;
- try {
- ok = _connection->runCommand("admin", cmd.obj(), res);
- }
- catch (const DBException& e) {
- log() << "SyncSourceFeedback error sending update: " << e.what() << endl;
- resetConnection();
- return false;
- }
- if (!ok) {
- log() << "SyncSourceFeedback error sending update, response: " << res.toString() <<endl;
- resetConnection();
- return false;
- }
- return true;
- }
-
- void SyncSourceFeedback::run() {
- Client::initThread("SyncSourceFeedbackThread");
- while (true) {
- {
- boost::unique_lock<boost::mutex> lock(_mtx);
- while (!_positionChanged && !_handshakeNeeded) {
- _cond.wait(lock);
- }
- boost::unique_lock<boost::mutex> conlock(_connmtx);
- const Member* target = replset::BackgroundSync::get()->getSyncTarget();
- if (_syncTarget != target) {
- resetConnection();
- _syncTarget = target;
- }
- if (!hasConnection()) {
- // fix connection if need be
- if (!target) {
- continue;
- }
- if (!_connect(target->fullName())) {
- continue;
- }
- else if (!supportsUpdater()) {
- _handshakeNeeded = false;
- _positionChanged = false;
- continue;
- }
- }
- if (_handshakeNeeded) {
- if (!replHandshake()) {
- _handshakeNeeded = true;
- continue;
- }
- else {
- _handshakeNeeded = false;
- }
- }
- if (_positionChanged) {
- if (!updateUpstream()) {
- _positionChanged = true;
- continue;
- }
- else {
- _positionChanged = false;
- }
- }
- }
- }
- }
-}
diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h
deleted file mode 100644
index 72073841029..00000000000
--- a/src/mongo/db/repl/sync_source_feedback.h
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
-* Copyright (C) 2013 10gen Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-
-#pragma once
-
-#include "mongo/db/repl/oplogreader.h"
-#include "mongo/util/background.h"
-
-
-namespace mongo {
-
- class Member;
-
- class SyncSourceFeedback : public BackgroundJob {
- public:
- SyncSourceFeedback() : BackgroundJob(false /*don't selfdelete*/),
- _syncTarget(NULL),
- _oplogReader(new OplogReader(true)),
- _supportsUpdater(true) {}
-
- ~SyncSourceFeedback() {
- delete _oplogReader;
- }
-
- /// Adds an entry to _member for a secondary that has connected to us.
- void associateMember(const BSONObj& id, const int memberId);
-
- /// Passes handshake up the replication chain, upon receiving a handshake.
- void forwardSlaveHandshake();
-
- void updateSelfInMap(const OpTime& ot) {
- ensureMe();
- updateMap(_me["_id"].OID(), ot);
- }
-
- /// Connect to sync target and create OplogReader if needed.
- bool connect(const Member* target);
-
- void resetConnection() {
- _connection.reset();
- }
-
- void resetOplogReaderConnection() {
- _oplogReader->resetConnection();
- }
-
- /// Used extensively in bgsync, to see if we need to use the OplogReader syncing method.
- bool supportsUpdater() const {
- // oplogReader will be NULL if new updater is supported
- //boost::unique_lock<boost::mutex> lock(_mtx);
- return _supportsUpdater;
- }
-
- /// Updates the _slaveMap to be forwarded to the sync target.
- void updateMap(const mongo::OID& rid, const OpTime& ot);
-
- std::string name() const { return "SyncSourceFeedbackThread"; }
-
- /// Loops forever, passing updates when they are present.
- void run();
-
- /* The below methods just fall through to OplogReader and are only used when our sync target
- * does not support the update command.
- */
- bool connectOplogReader(const std::string& hostName) {
- return _oplogReader->connect(hostName);
- }
-
- bool connect(const BSONObj& rid, const int from, const string& to) {
- return _oplogReader->connect(rid, from, to);
- }
-
- void ghostQueryGTE(const char *ns, OpTime t) {
- _oplogReader->ghostQueryGTE(ns, t);
- }
-
- bool haveCursor() {
- return _oplogReader->haveCursor();
- }
-
- bool more() {
- return _oplogReader->more();
- }
-
- bool moreInCurrentBatch() {
- return _oplogReader->moreInCurrentBatch();
- }
-
- BSONObj nextSafe() {
- return _oplogReader->nextSafe();
- }
-
- void tailCheck() {
- _oplogReader->tailCheck();
- }
-
- void tailingQueryGTE(const char *ns, OpTime t, const BSONObj* fields=0) {
- _oplogReader->tailingQueryGTE(ns, t, fields);
- }
-
- private:
- /// Ensures local.me is populated and populates it if not.
- void ensureMe();
-
- /* Generally replAuthenticate will only be called within system threads to fully
- * authenticate connections to other nodes in the cluster that will be used as part of
- * internal operations. If a user-initiated action results in needing to call
- * replAuthenticate, you can call it with skipAuthCheck set to false. Only do this if you
- * are certain that the proper auth checks have already run to ensure that the user is
- * authorized to do everything that this connection will be used for!
- */
- bool replAuthenticate(bool skipAuthCheck);
-
- /* Sends initialization information to our sync target, also determines whether or not they
- * support the updater command.
- */
- bool replHandshake();
-
- /* Inform the sync target of our current position in the oplog, as well as the positions
- * of all secondaries chained through us.
- */
- bool updateUpstream();
-
- bool hasConnection() {
- return _connection.get();
- }
-
- /// Connect to sync target and create OplogReader if needed.
- bool _connect(const std::string& hostName);
-
- // stores our OID to be passed along in commands
- BSONObj _me;
- // the member we are currently syncing from
- const Member* _syncTarget;
- // holds the oplogReader for use when we fall back to old style updates
- OplogReader* _oplogReader;
- // our connection to our sync target
- boost::scoped_ptr<DBClientConnection> _connection;
- // tracks whether we are in fallback mode or not
- bool _supportsUpdater;
- // protects connection
- boost::mutex _connmtx;
- // protects cond and maps and the indicator bools
- boost::mutex _mtx;
- // contains the most recent optime of each member syncing to us
- map<mongo::OID, OpTime> _slaveMap;
- typedef map<mongo::OID, Member*> OIDMemberMap;
- // contains a pointer to each member, which we can look up by oid
- OIDMemberMap _members;
- // used to alert our thread of changes which need to be passed up the chain
- boost::condition _cond;
- // used to indicate a position change which has not yet been pushed along
- bool _positionChanged;
- // used to indicate a connection change which has not yet been shook on
- bool _handshakeNeeded;
- };
-}
diff --git a/src/mongo/db/repl/write_concern.cpp b/src/mongo/db/repl/write_concern.cpp
index d05575f2135..0564bc0a678 100644
--- a/src/mongo/db/repl/write_concern.cpp
+++ b/src/mongo/db/repl/write_concern.cpp
@@ -122,22 +122,20 @@ namespace mongo {
scoped_lock mylk(_mutex);
- if (last > _slaves[ident]) {
- _slaves[ident] = last;
- _dirty = true;
+ _slaves[ident] = last;
+ _dirty = true;
- if (theReplSet && theReplSet->isPrimary()) {
- theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last);
- }
-
- if ( ! _started ) {
- // start background thread here since we definitely need it
- _started = true;
- go();
- }
+ if (theReplSet && theReplSet->isPrimary()) {
+ theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last);
+ }
- _threadsWaitingForReplication.notify_all();
+ if ( ! _started ) {
+ // start background thread here since we definitely need it
+ _started = true;
+ go();
}
+
+ _threadsWaitingForReplication.notify_all();
}
bool opReplicatedEnough( OpTime op , BSONElement w ) {
@@ -255,28 +253,6 @@ namespace mongo {
const char * SlaveTracking::NS = "local.slaves";
- // parse optimes from replUpdatePositionCommand and pass them to SyncSourceFeedback
- void updateSlaveLocations(BSONArray optimes) {
- BSONForEach(elem, optimes) {
- BSONObj entry = elem.Obj();
- BSONObj id = BSON("_id" << entry["_id"].OID());
- OpTime ot = entry["optime"]._opTime();
- BSONObj config = entry["config"].Obj();
-
- // update locally
- slaveTracking.update(id, config, "local.oplog.rs", ot);
- if (theReplSet && !theReplSet->isPrimary()) {
- // pass along if we are not primary
- theReplSet->syncSourceFeedback.updateMap(entry["_id"].OID(), ot);
- // for to be backwards compatible
- theReplSet->ghost->send(boost::bind(&GhostSync::percolate,
- theReplSet->ghost,
- id,
- ot));
- }
- }
- }
-
void updateSlaveLocation( CurOp& curop, const char * ns , OpTime lastOp ) {
if ( lastOp.isNull() )
return;
diff --git a/src/mongo/db/repl/write_concern.h b/src/mongo/db/repl/write_concern.h
index b28ae1ae830..d9b0163635b 100644
--- a/src/mongo/db/repl/write_concern.h
+++ b/src/mongo/db/repl/write_concern.h
@@ -29,8 +29,6 @@ namespace mongo {
class CurOp;
- void updateSlaveLocations(BSONArray optimes);
-
void updateSlaveLocation( CurOp& curop, const char * oplog_ns , OpTime lastOp );
/** @return true if op has made it to w servers */