summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2013-07-15 14:30:17 -0400
committermatt dannenberg <matt.dannenberg@10gen.com>2013-07-22 10:43:40 -0400
commit27c4e7fbd2ef6eeb04dccd1bcdecdb21b00522d1 (patch)
treef129314545bfcc0a5c64fb2701baa2fc4f1628a2 /src/mongo/db
parent9bf70757db09b3a7166440c508bf798d50bd212a (diff)
downloadmongo-27c4e7fbd2ef6eeb04dccd1bcdecdb21b00522d1.tar.gz
Revert "Revert "SERVER-6071 use command on local.slaves instead of cursor""
This reverts commit 6486b4035c5ac52679eb3e1a034c925ccdd20deb.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/auth/action_types.txt1
-rw-r--r--src/mongo/db/repl/bgsync.cpp68
-rw-r--r--src/mongo/db/repl/bgsync.h1
-rw-r--r--src/mongo/db/repl/health.cpp9
-rw-r--r--src/mongo/db/repl/oplogreader.cpp2
-rw-r--r--src/mongo/db/repl/oplogreader.h3
-rw-r--r--src/mongo/db/repl/replset_commands.cpp39
-rw-r--r--src/mongo/db/repl/rs.cpp7
-rw-r--r--src/mongo/db/repl/rs.h4
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp310
-rw-r--r--src/mongo/db/repl/sync_source_feedback.h171
-rw-r--r--src/mongo/db/repl/write_concern.cpp46
-rw-r--r--src/mongo/db/repl/write_concern.h2
13 files changed, 618 insertions, 45 deletions
diff --git a/src/mongo/db/auth/action_types.txt b/src/mongo/db/auth/action_types.txt
index 0cf6c33223f..532f1a41564 100644
--- a/src/mongo/db/auth/action_types.txt
+++ b/src/mongo/db/auth/action_types.txt
@@ -72,6 +72,7 @@
"replSetReconfig",
"replSetStepDown",
"replSetSyncFrom",
+"replSetUpdatePosition",
"resync",
"serverStatus",
"setParameter",
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 6e9e7538f5a..f453664262d 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -22,6 +22,7 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/rs_sync.h"
+#include "mongo/db/repl/rs.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/base/counter.h"
#include "mongo/db/stats/timer_stats.h"
@@ -79,7 +80,6 @@ namespace replset {
_assumingPrimary(false),
_currentSyncTarget(NULL),
_oplogMarkerTarget(NULL),
- _oplogMarker(true /* doHandshake */),
_consumedOpTime(0, 0) {
}
@@ -122,6 +122,7 @@ namespace replset {
void BackgroundSync::notifierThread() {
Client::initThread("rsSyncNotifier");
replLocalAuth();
+ theReplSet->syncSourceFeedback.go();
while (!inShutdown()) {
bool clearTarget = false;
@@ -168,37 +169,44 @@ namespace replset {
}
void BackgroundSync::markOplog() {
- LOG(3) << "replset markOplog: " << _consumedOpTime << " " << theReplSet->lastOpTimeWritten << rsLog;
+ LOG(3) << "replset markOplog: " << _consumedOpTime << " "
+ << theReplSet->lastOpTimeWritten << rsLog;
- if (!hasCursor()) {
- sleepsecs(1);
- return;
+ if (theReplSet->syncSourceFeedback.supportsUpdater()) {
+ theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten);
+ _consumedOpTime = theReplSet->lastOpTimeWritten;
}
+ else {
+ if (!hasCursor()) {
+ return;
+ }
- if (!_oplogMarker.moreInCurrentBatch()) {
- _oplogMarker.more();
- }
+ if (!theReplSet->syncSourceFeedback.moreInCurrentBatch()) {
+ theReplSet->syncSourceFeedback.more();
+ }
- if (!_oplogMarker.more()) {
- _oplogMarker.tailCheck();
- sleepsecs(1);
- return;
- }
+ if (!theReplSet->syncSourceFeedback.more()) {
+ theReplSet->syncSourceFeedback.tailCheck();
+ return;
+ }
- // if this member has written the op at optime T, we want to nextSafe up to and including T
- while (_consumedOpTime < theReplSet->lastOpTimeWritten && _oplogMarker.more()) {
- BSONObj temp = _oplogMarker.nextSafe();
- _consumedOpTime = temp["ts"]._opTime();
- }
+ // if this member has written the op at optime T
+ // we want to nextSafe up to and including T
+ while (_consumedOpTime < theReplSet->lastOpTimeWritten
+ && theReplSet->syncSourceFeedback.more()) {
+ BSONObj temp = theReplSet->syncSourceFeedback.nextSafe();
+ _consumedOpTime = temp["ts"]._opTime();
+ }
- // call more() to signal the sync target that we've synced T
- _oplogMarker.more();
+ // call more() to signal the sync target that we've synced T
+ theReplSet->syncSourceFeedback.more();
+ }
}
bool BackgroundSync::hasCursor() {
{
// prevent writers from blocking readers during fsync
- SimpleMutex::scoped_lock fsynclk(filesLockedFsync);
+ SimpleMutex::scoped_lock fsynclk(filesLockedFsync);
// we don't need the local write lock yet, but it's needed by OplogReader::connect
// so we take it preemptively to avoid deadlocking.
Lock::DBWrite lk("local");
@@ -210,25 +218,23 @@ namespace replset {
return false;
}
- log() << "replset setting oplog notifier to " << _currentSyncTarget->fullName() << rsLog;
+ log() << "replset setting oplog notifier to "
+ << _currentSyncTarget->fullName() << rsLog;
_oplogMarkerTarget = _currentSyncTarget;
- _oplogMarker.resetConnection();
-
- if (!_oplogMarker.connect(_oplogMarkerTarget->fullName())) {
- LOG(1) << "replset could not connect to " << _oplogMarkerTarget->fullName() << rsLog;
+ if (!theReplSet->syncSourceFeedback.connect(_oplogMarkerTarget)) {
_oplogMarkerTarget = NULL;
return false;
}
}
}
-
- if (!_oplogMarker.haveCursor()) {
+ if (!theReplSet->syncSourceFeedback.haveCursor()) {
BSONObj fields = BSON("ts" << 1);
- _oplogMarker.tailingQueryGTE(rsoplog, theReplSet->lastOpTimeWritten, &fields);
+ theReplSet->syncSourceFeedback.tailingQueryGTE(rsoplog,
+ theReplSet->lastOpTimeWritten, &fields);
}
- return _oplogMarker.haveCursor();
+ return theReplSet->syncSourceFeedback.haveCursor();
}
void BackgroundSync::producerThread() {
@@ -525,6 +531,8 @@ namespace replset {
_currentSyncTarget = target;
}
+ theReplSet->syncSourceFeedback.connect(target);
+
return;
}
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 1f4cb3b1530..02d359cfee5 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -86,7 +86,6 @@ namespace replset {
boost::mutex _lastOpMutex;
const Member* _oplogMarkerTarget;
- OplogReader _oplogMarker; // not locked, only used by notifier thread
OpTime _consumedOpTime; // not locked, only used by notifier thread
BackgroundSync();
diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp
index 7dd9f528d27..4dc089de971 100644
--- a/src/mongo/db/repl/health.cpp
+++ b/src/mongo/db/repl/health.cpp
@@ -348,6 +348,15 @@ namespace mongo {
return 0;
}
+ Member* ReplSetImpl::getMutableMember(unsigned id) {
+ if( _self && id == _self->id() ) return _self;
+
+ for( Member *m = head(); m; m = m->next() )
+ if( m->id() == id )
+ return m;
+ return 0;
+ }
+
Member* ReplSetImpl::findByName(const std::string& hostname) const {
if (_self && hostname == _self->fullName()) {
return _self;
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index 5ee5a4628b4..ea55ada5c11 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -140,7 +140,7 @@ namespace mongo {
if( conn() == 0 ) {
_conn = shared_ptr<DBClientConnection>(new DBClientConnection(false,
0,
- 30 /* tcp timeout */));
+ tcp_timeout));
string errmsg;
if ( !_conn->connect(hostName.c_str(), errmsg) ||
(AuthorizationManager::isAuthEnabled() && !replAuthenticate(_conn.get(), true)) ) {
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 801b9a4854e..345dc2be9ee 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -50,6 +50,9 @@ namespace mongo {
return findOne(ns, Query().sort(reverseNaturalObj));
}
+ /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */
+ static const int tcp_timeout = 30;
+
/* ok to call if already connected */
bool connect(const std::string& hostname);
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index 1f9abce2b39..8e765f7b5a2 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -24,6 +24,7 @@
#include "mongo/db/cmdline.h"
#include "mongo/db/commands.h"
#include "mongo/db/repl/health.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_server_status.h" // replSettings
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/rs_config.h"
@@ -396,4 +397,42 @@ namespace mongo {
}
} cmdReplSetSyncFrom;
+ class CmdReplSetUpdatePosition: public ReplSetCommand {
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "internal";
+ }
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::replSetUpdatePosition);
+ out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions));
+ }
+ CmdReplSetUpdatePosition() : ReplSetCommand("replSetUpdatePosition") { }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg,
+ BSONObjBuilder& result, bool fromRepl) {
+ if (!check(errmsg, result))
+ return false;
+
+ if (cmdObj.hasField("handshake")) {
+ // we have received a handshake, not an update message
+ // handshakes are done here to ensure the receiving end supports the update command
+ cc().gotHandshake(cmdObj["handshake"].embeddedObject());
+ // if we aren't primary, pass the handshake along
+ if (!theReplSet->isPrimary() && theReplSet->syncSourceFeedback.supportsUpdater()) {
+ theReplSet->syncSourceFeedback.forwardSlaveHandshake();
+ }
+ return true;
+ }
+
+ uassert(16888, "optimes field should be an array with an object for each secondary",
+ cmdObj["optimes"].type() == Array);
+ BSONArray newTimes = BSONArray(cmdObj["optimes"].Obj());
+ updateSlaveLocations(newTimes);
+
+ return true;
+ }
+ } cmdReplSetUpdatePosition;
+
}
diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp
index 85d4c7a5b6d..4d7a635d621 100644
--- a/src/mongo/db/repl/rs.cpp
+++ b/src/mongo/db/repl/rs.cpp
@@ -928,8 +928,11 @@ namespace mongo {
void ReplSetImpl::registerSlave(const BSONObj& rid, const int memberId) {
// To prevent race conditions with clearing the cache at reconfig time,
// we lock the replset mutex here.
- lock lk(this);
- ghost->associateSlave(rid, memberId);
+ {
+ lock lk(this);
+ ghost->associateSlave(rid, memberId);
+ }
+ syncSourceFeedback.associateMember(rid, memberId);
}
class ReplIndexPrefetch : public ServerParameter {
diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h
index e1414b6603d..46ee578dc22 100644
--- a/src/mongo/db/repl/rs.h
+++ b/src/mongo/db/repl/rs.h
@@ -26,6 +26,7 @@
#include "mongo/db/repl/rs_exception.h"
#include "mongo/db/repl/rs_member.h"
#include "mongo/db/repl/rs_sync.h"
+#include "mongo/db/repl/sync_source_feedback.h"
#include "mongo/util/concurrency/list.h"
#include "mongo/util/concurrency/msg.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -341,6 +342,8 @@ namespace mongo {
StateBox box;
+ SyncSourceFeedback syncSourceFeedback;
+
OpTime lastOpTimeWritten;
long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork"
bool forceSyncFrom(const string& host, string& errmsg, BSONObjBuilder& result);
@@ -505,6 +508,7 @@ namespace mongo {
Member* head() const { return _members.head(); }
public:
const Member* findById(unsigned id) const;
+ Member* getMutableMember(unsigned id);
Member* findByName(const std::string& hostname) const;
private:
void _getTargets(list<Target>&, int &configVersion);
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
new file mode 100644
index 00000000000..21a7bffd3c4
--- /dev/null
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -0,0 +1,310 @@
+/**
+* Copyright (C) 2013 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "mongo/db/repl/sync_source_feedback.h"
+
+#include "mongo/client/constants.h"
+#include "mongo/client/dbclientcursor.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/auth/security_key.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/repl/bgsync.h"
+#include "mongo/db/repl/rs.h" // theReplSet
+
+namespace mongo {
+
+ // used in replAuthenticate
+ static const BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}");
+
+ void SyncSourceFeedback::associateMember(const BSONObj& id, const int memberId) {
+ const OID rid = id["_id"].OID();
+ boost::unique_lock<boost::mutex> lock(_mtx);
+ _handshakeNeeded = true;
+ _members[rid] = theReplSet->getMutableMember(memberId);
+ _cond.notify_all();
+ }
+
+ bool SyncSourceFeedback::replAuthenticate(bool skipAuthCheck) {
+
+ if (!AuthorizationManager::isAuthEnabled()) {
+ return true;
+ }
+ if (!skipAuthCheck && !cc().getAuthorizationSession()->hasInternalAuthorization()) {
+ log() << "replauthenticate: requires internal authorization, failing" << endl;
+ return false;
+ }
+
+ if (isInternalAuthSet()) {
+ return authenticateInternalUser(_connection.get());
+ }
+
+ BSONObj user;
+ {
+ Client::ReadContext ctxt("local.");
+ if(!Helpers::findOne("local.system.users", userReplQuery, user) ||
+ // try the first user in local
+ !Helpers::getSingleton("local.system.users", user)) {
+ log() << "replauthenticate: no user in local.system.users to use"
+ << "for authentication" << endl;
+ return false;
+ }
+ }
+ std::string u = user.getStringField("user");
+ std::string p = user.getStringField("pwd");
+ massert(16889, "bad user object? [1]", !u.empty());
+ massert(16887, "bad user object? [2]", !p.empty());
+
+ std::string err;
+
+ if( !_connection->auth("local", u.c_str(), p.c_str(), err, false) ) {
+ log() << "replauthenticate: can't authenticate to master server, user:" << u << endl;
+ return false;
+ }
+
+ return true;
+ }
+
+ void SyncSourceFeedback::ensureMe() {
+ string myname = getHostName();
+ {
+ Client::WriteContext ctx("local");
+ // local.me is an identifier for a server for getLastError w:2+
+ if (!Helpers::getSingleton("local.me", _me) ||
+ !_me.hasField("host") ||
+ _me["host"].String() != myname) {
+
+ // clean out local.me
+ Helpers::emptyCollection("local.me");
+
+ // repopulate
+ BSONObjBuilder b;
+ b.appendOID("_id", 0, true);
+ b.append("host", myname);
+ _me = b.obj();
+ Helpers::putSingleton("local.me", _me);
+ }
+ }
+ }
+
+ bool SyncSourceFeedback::replHandshake() {
+ ensureMe();
+
+ // handshake for us
+ BSONObjBuilder cmd;
+ cmd.append("replSetUpdatePosition", 1);
+ BSONObjBuilder sub (cmd.subobjStart("handshake"));
+ sub.appendAs(_me["_id"], "handshake");
+ sub.append("member", theReplSet->selfId());
+ sub.append("config", theReplSet->myConfig().asBson());
+ sub.doneFast();
+
+ BSONObj res;
+ try {
+ if (!_connection->runCommand("admin", cmd.obj(), res)) {
+ if (res["errmsg"].str().find("no such cmd") != std::string::npos) {
+ _supportsUpdater = false;
+ }
+ resetConnection();
+ return false;
+ }
+ else {
+ _supportsUpdater = true;
+ }
+ }
+ catch (const DBException& e) {
+ log() << "SyncSourceFeedback error sending handshake: " << e.what() << endl;
+ resetConnection();
+ return false;
+ }
+
+ // handshakes for those connected to us
+ {
+ for (OIDMemberMap::iterator itr = _members.begin();
+ itr != _members.end(); ++itr) {
+ BSONObjBuilder slaveCmd;
+ slaveCmd.append("replSetUpdatePosition", 1);
+ // outer handshake indicates this is a handshake command
+ // inner is needed as part of the structure to be passed to gotHandshake
+ BSONObjBuilder slaveSub (slaveCmd.subobjStart("handshake"));
+ slaveSub.append("handshake", itr->first);
+ slaveSub.append("member", itr->second->id());
+ slaveSub.append("config", itr->second->config().asBson());
+ slaveSub.doneFast();
+ BSONObj slaveRes;
+ try {
+ if (!_connection->runCommand("admin", slaveCmd.obj(), slaveRes)) {
+ resetConnection();
+ return false;
+ }
+ }
+ catch (const DBException& e) {
+ log() << "SyncSourceFeedback error sending chained handshakes: "
+ << e.what() << endl;
+ resetConnection();
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ bool SyncSourceFeedback::_connect(const std::string& hostName) {
+ if (hasConnection()) {
+ return true;
+ }
+ _connection.reset(new DBClientConnection(false, 0, OplogReader::tcp_timeout));
+ string errmsg;
+ if (!_connection->connect(hostName.c_str(), errmsg) ||
+ (AuthorizationManager::isAuthEnabled() && !replAuthenticate(true))) {
+ resetConnection();
+ log() << "repl: " << errmsg << endl;
+ return false;
+ }
+
+ if (!replHandshake()) {
+ if (!supportsUpdater()) {
+ return connectOplogReader(hostName);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ bool SyncSourceFeedback::connect(const Member* target) {
+ boost::unique_lock<boost::mutex> lock(_connmtx);
+ resetConnection();
+ resetOplogReaderConnection();
+ _syncTarget = target;
+ if (_connect(target->fullName())) {
+ if (!supportsUpdater()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void SyncSourceFeedback::forwardSlaveHandshake() {
+ boost::unique_lock<boost::mutex> lock(_mtx);
+ _handshakeNeeded = true;
+ }
+
+ void SyncSourceFeedback::updateMap(const mongo::OID& rid, const OpTime& ot) {
+ boost::unique_lock<boost::mutex> lock(_mtx);
+ LOG(1) << "replSet last: " << _slaveMap[rid].toString() << " to " << ot.toString() << endl;
+ // only update if ot is newer than what we have already
+ if (ot > _slaveMap[rid]) {
+ _slaveMap[rid] = ot;
+ _positionChanged = true;
+ LOG(2) << "now last is " << _slaveMap[rid].toString() << endl;
+ _cond.notify_all();
+ }
+ }
+
+ bool SyncSourceFeedback::updateUpstream() {
+ if (theReplSet->isPrimary()) {
+ // primary has no one to update to
+ return true;
+ }
+ BSONObjBuilder cmd;
+ cmd.append("replSetUpdatePosition", 1);
+ // create an array containing objects each member connected to us and for ourself
+ BSONArrayBuilder array (cmd.subarrayStart("optimes"));
+ OID myID = _me["_id"].OID();
+ {
+ for (map<mongo::OID, OpTime>::const_iterator itr = _slaveMap.begin();
+ itr != _slaveMap.end(); ++itr) {
+ BSONObjBuilder entry(array.subobjStart());
+ entry.append("_id", itr->first);
+ entry.append("optime", itr->second);
+ if (itr->first == myID) {
+ entry.append("config", theReplSet->myConfig().asBson());
+ }
+ else {
+ entry.append("config", _members[itr->first]->config().asBson());
+ }
+ entry.doneFast();
+ }
+ }
+ array.done();
+ BSONObj res;
+
+ bool ok;
+ try {
+ ok = _connection->runCommand("admin", cmd.obj(), res);
+ }
+ catch (const DBException& e) {
+ log() << "SyncSourceFeedback error sending update: " << e.what() << endl;
+ resetConnection();
+ return false;
+ }
+ if (!ok) {
+ log() << "SyncSourceFeedback error sending update, response: " << res.toString() <<endl;
+ resetConnection();
+ return false;
+ }
+ return true;
+ }
+
+ void SyncSourceFeedback::run() {
+ Client::initThread("SyncSourceFeedbackThread");
+ while (true) {
+ {
+ boost::unique_lock<boost::mutex> lock(_mtx);
+ while (!_positionChanged && !_handshakeNeeded) {
+ _cond.wait(lock);
+ }
+ boost::unique_lock<boost::mutex> conlock(_connmtx);
+ const Member* target = replset::BackgroundSync::get()->getSyncTarget();
+ if (_syncTarget != target) {
+ resetConnection();
+ _syncTarget = target;
+ }
+ if (!hasConnection()) {
+ // fix connection if need be
+ if (!target) {
+ continue;
+ }
+ if (!_connect(target->fullName())) {
+ continue;
+ }
+ else if (!supportsUpdater()) {
+ _handshakeNeeded = false;
+ _positionChanged = false;
+ continue;
+ }
+ }
+ if (_handshakeNeeded) {
+ if (!replHandshake()) {
+ _handshakeNeeded = true;
+ continue;
+ }
+ else {
+ _handshakeNeeded = false;
+ }
+ }
+ if (_positionChanged) {
+ if (!updateUpstream()) {
+ _positionChanged = true;
+ continue;
+ }
+ else {
+ _positionChanged = false;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h
new file mode 100644
index 00000000000..72073841029
--- /dev/null
+++ b/src/mongo/db/repl/sync_source_feedback.h
@@ -0,0 +1,171 @@
+/**
+* Copyright (C) 2013 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+
+#pragma once
+
+#include "mongo/db/repl/oplogreader.h"
+#include "mongo/util/background.h"
+
+
+namespace mongo {
+
+ class Member;
+
+ class SyncSourceFeedback : public BackgroundJob {
+ public:
+ SyncSourceFeedback() : BackgroundJob(false /*don't selfdelete*/),
+ _syncTarget(NULL),
+ _oplogReader(new OplogReader(true)),
+ _supportsUpdater(true) {}
+
+ ~SyncSourceFeedback() {
+ delete _oplogReader;
+ }
+
+ /// Adds an entry to _member for a secondary that has connected to us.
+ void associateMember(const BSONObj& id, const int memberId);
+
+ /// Passes handshake up the replication chain, upon receiving a handshake.
+ void forwardSlaveHandshake();
+
+ void updateSelfInMap(const OpTime& ot) {
+ ensureMe();
+ updateMap(_me["_id"].OID(), ot);
+ }
+
+ /// Connect to sync target and create OplogReader if needed.
+ bool connect(const Member* target);
+
+ void resetConnection() {
+ _connection.reset();
+ }
+
+ void resetOplogReaderConnection() {
+ _oplogReader->resetConnection();
+ }
+
+ /// Used extensively in bgsync, to see if we need to use the OplogReader syncing method.
+ bool supportsUpdater() const {
+ // oplogReader will be NULL if new updater is supported
+ //boost::unique_lock<boost::mutex> lock(_mtx);
+ return _supportsUpdater;
+ }
+
+ /// Updates the _slaveMap to be forwarded to the sync target.
+ void updateMap(const mongo::OID& rid, const OpTime& ot);
+
+ std::string name() const { return "SyncSourceFeedbackThread"; }
+
+ /// Loops forever, passing updates when they are present.
+ void run();
+
+ /* The below methods just fall through to OplogReader and are only used when our sync target
+ * does not support the update command.
+ */
+ bool connectOplogReader(const std::string& hostName) {
+ return _oplogReader->connect(hostName);
+ }
+
+ bool connect(const BSONObj& rid, const int from, const string& to) {
+ return _oplogReader->connect(rid, from, to);
+ }
+
+ void ghostQueryGTE(const char *ns, OpTime t) {
+ _oplogReader->ghostQueryGTE(ns, t);
+ }
+
+ bool haveCursor() {
+ return _oplogReader->haveCursor();
+ }
+
+ bool more() {
+ return _oplogReader->more();
+ }
+
+ bool moreInCurrentBatch() {
+ return _oplogReader->moreInCurrentBatch();
+ }
+
+ BSONObj nextSafe() {
+ return _oplogReader->nextSafe();
+ }
+
+ void tailCheck() {
+ _oplogReader->tailCheck();
+ }
+
+ void tailingQueryGTE(const char *ns, OpTime t, const BSONObj* fields=0) {
+ _oplogReader->tailingQueryGTE(ns, t, fields);
+ }
+
+ private:
+ /// Ensures local.me is populated and populates it if not.
+ void ensureMe();
+
+ /* Generally replAuthenticate will only be called within system threads to fully
+ * authenticate connections to other nodes in the cluster that will be used as part of
+ * internal operations. If a user-initiated action results in needing to call
+ * replAuthenticate, you can call it with skipAuthCheck set to false. Only do this if you
+ * are certain that the proper auth checks have already run to ensure that the user is
+ * authorized to do everything that this connection will be used for!
+ */
+ bool replAuthenticate(bool skipAuthCheck);
+
+ /* Sends initialization information to our sync target, also determines whether or not they
+ * support the updater command.
+ */
+ bool replHandshake();
+
+ /* Inform the sync target of our current position in the oplog, as well as the positions
+ * of all secondaries chained through us.
+ */
+ bool updateUpstream();
+
+ bool hasConnection() {
+ return _connection.get();
+ }
+
+ /// Connect to sync target and create OplogReader if needed.
+ bool _connect(const std::string& hostName);
+
+ // stores our OID to be passed along in commands
+ BSONObj _me;
+ // the member we are currently syncing from
+ const Member* _syncTarget;
+ // holds the oplogReader for use when we fall back to old style updates
+ OplogReader* _oplogReader;
+ // our connection to our sync target
+ boost::scoped_ptr<DBClientConnection> _connection;
+ // tracks whether we are in fallback mode or not
+ bool _supportsUpdater;
+ // protects connection
+ boost::mutex _connmtx;
+ // protects cond and maps and the indicator bools
+ boost::mutex _mtx;
+ // contains the most recent optime of each member syncing to us
+ map<mongo::OID, OpTime> _slaveMap;
+ typedef map<mongo::OID, Member*> OIDMemberMap;
+ // contains a pointer to each member, which we can look up by oid
+ OIDMemberMap _members;
+ // used to alert our thread of changes which need to be passed up the chain
+ boost::condition _cond;
+ // used to indicate a position change which has not yet been pushed along
+ bool _positionChanged;
+ // used to indicate a connection change which has not yet been shook on
+ bool _handshakeNeeded;
+ };
+}
diff --git a/src/mongo/db/repl/write_concern.cpp b/src/mongo/db/repl/write_concern.cpp
index da5b04a91e4..a5e6a64bf4d 100644
--- a/src/mongo/db/repl/write_concern.cpp
+++ b/src/mongo/db/repl/write_concern.cpp
@@ -126,20 +126,22 @@ namespace mongo {
scoped_lock mylk(_mutex);
- _slaves[ident] = last;
- _dirty = true;
+ if (last > _slaves[ident]) {
+ _slaves[ident] = last;
+ _dirty = true;
- if (theReplSet && theReplSet->isPrimary()) {
- theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last);
- }
+ if (theReplSet && theReplSet->isPrimary()) {
+ theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last);
+ }
- if ( ! _started ) {
- // start background thread here since we definitely need it
- _started = true;
- go();
+ if ( ! _started ) {
+ // start background thread here since we definitely need it
+ _started = true;
+ go();
+ }
+
+ _threadsWaitingForReplication.notify_all();
}
-
- _threadsWaitingForReplication.notify_all();
}
bool opReplicatedEnough( OpTime op , BSONElement w ) {
@@ -257,6 +259,28 @@ namespace mongo {
const char * SlaveTracking::NS = "local.slaves";
+ // parse optimes from replUpdatePositionCommand and pass them to SyncSourceFeedback
+ void updateSlaveLocations(BSONArray optimes) {
+ BSONForEach(elem, optimes) {
+ BSONObj entry = elem.Obj();
+ BSONObj id = BSON("_id" << entry["_id"].OID());
+ OpTime ot = entry["optime"]._opTime();
+ BSONObj config = entry["config"].Obj();
+
+ // update locally
+ slaveTracking.update(id, config, "local.oplog.rs", ot);
+ if (theReplSet && !theReplSet->isPrimary()) {
+ // pass along if we are not primary
+ theReplSet->syncSourceFeedback.updateMap(entry["_id"].OID(), ot);
+ // for to be backwards compatible
+ theReplSet->ghost->send(boost::bind(&GhostSync::percolate,
+ theReplSet->ghost,
+ id,
+ ot));
+ }
+ }
+ }
+
void updateSlaveLocation( CurOp& curop, const char * ns , OpTime lastOp ) {
if ( lastOp.isNull() )
return;
diff --git a/src/mongo/db/repl/write_concern.h b/src/mongo/db/repl/write_concern.h
index f80df329a4b..8a6d695fb9c 100644
--- a/src/mongo/db/repl/write_concern.h
+++ b/src/mongo/db/repl/write_concern.h
@@ -29,6 +29,8 @@ namespace mongo {
class CurOp;
+ void updateSlaveLocations(BSONArray optimes);
+
void updateSlaveLocation( CurOp& curop, const char * oplog_ns , OpTime lastOp );
/** @return true if op has made it to w servers */