diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/auth/action_types.txt | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/health.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_commands.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/repl/rs.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/rs.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.cpp | 308 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.h | 168 | ||||
-rw-r--r-- | src/mongo/db/repl/write_concern.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/repl/write_concern.h | 2 |
14 files changed, 614 insertions, 45 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 50a9c2838c2..ba5c577e989 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -448,6 +448,7 @@ serverOnlyFiles = [ "db/curop.cpp", "db/repl/master_slave.cpp", "db/repl/finding_start_cursor.cpp", "db/repl/sync.cpp", + "db/repl/sync_source_feedback.cpp", "db/repl/optime.cpp", "db/repl/oplogreader.cpp", "db/repl/replication_server_status.cpp", diff --git a/src/mongo/db/auth/action_types.txt b/src/mongo/db/auth/action_types.txt index 057084293a3..bebec04b78e 100644 --- a/src/mongo/db/auth/action_types.txt +++ b/src/mongo/db/auth/action_types.txt @@ -71,6 +71,7 @@ "replSetReconfig", "replSetStepDown", "replSetSyncFrom", +"replSetUpdatePosition", "resync", "serverStatus", "setParameter", diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 6b18f1f8c68..f9cdc96ec9f 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -22,6 +22,7 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/rs.h" #include "mongo/util/fail_point_service.h" #include "mongo/base/counter.h" #include "mongo/db/stats/timer_stats.h" @@ -79,7 +80,6 @@ namespace replset { _assumingPrimary(false), _currentSyncTarget(NULL), _oplogMarkerTarget(NULL), - _oplogMarker(true /* doHandshake */), _consumedOpTime(0, 0) { } @@ -122,6 +122,7 @@ namespace replset { void BackgroundSync::notifierThread() { Client::initThread("rsSyncNotifier"); replLocalAuth(); + theReplSet->syncSourceFeedback.go(); while (!inShutdown()) { bool clearTarget = false; @@ -168,37 +169,44 @@ namespace replset { } void BackgroundSync::markOplog() { - LOG(3) << "replset markOplog: " << _consumedOpTime << " " << theReplSet->lastOpTimeWritten << rsLog; + LOG(3) << "replset markOplog: " << _consumedOpTime << " " + << theReplSet->lastOpTimeWritten << rsLog; - if (!hasCursor()) { - sleepsecs(1); - return; + if (theReplSet->syncSourceFeedback.supportsUpdater()) { + theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten); + _consumedOpTime = theReplSet->lastOpTimeWritten; } + else { + if (!hasCursor()) { + return; + } - if (!_oplogMarker.moreInCurrentBatch()) { - _oplogMarker.more(); - } + if (!theReplSet->syncSourceFeedback.moreInCurrentBatch()) { + theReplSet->syncSourceFeedback.more(); + } - if (!_oplogMarker.more()) { - _oplogMarker.tailCheck(); - sleepsecs(1); - return; - } + if (!theReplSet->syncSourceFeedback.more()) { + theReplSet->syncSourceFeedback.tailCheck(); + return; + } - // if this member has written the op at optime T, we want to nextSafe up to and including T - while (_consumedOpTime < theReplSet->lastOpTimeWritten && _oplogMarker.more()) { - BSONObj temp = _oplogMarker.nextSafe(); - _consumedOpTime = temp["ts"]._opTime(); - } + // if this member has written the op at optime T + // we want to nextSafe up to and including T + while (_consumedOpTime < theReplSet->lastOpTimeWritten + && theReplSet->syncSourceFeedback.more()) { + BSONObj temp = theReplSet->syncSourceFeedback.nextSafe(); + _consumedOpTime = temp["ts"]._opTime(); + } - // call more() to signal the sync target that we've synced T - _oplogMarker.more(); + // call more() to signal the sync target that we've synced T + theReplSet->syncSourceFeedback.more(); + } } bool BackgroundSync::hasCursor() { { // prevent writers from blocking readers during fsync - SimpleMutex::scoped_lock fsynclk(filesLockedFsync); + SimpleMutex::scoped_lock fsynclk(filesLockedFsync); // we don't need the local write lock yet, but it's needed by OplogReader::connect // so we take it preemptively to avoid deadlocking. Lock::DBWrite lk("local"); @@ -210,25 +218,23 @@ namespace replset { return false; } - log() << "replset setting oplog notifier to " << _currentSyncTarget->fullName() << rsLog; + log() << "replset setting oplog notifier to " + << _currentSyncTarget->fullName() << rsLog; _oplogMarkerTarget = _currentSyncTarget; - _oplogMarker.resetConnection(); - - if (!_oplogMarker.connect(_oplogMarkerTarget->fullName())) { - LOG(1) << "replset could not connect to " << _oplogMarkerTarget->fullName() << rsLog; + if (!theReplSet->syncSourceFeedback.connect(_oplogMarkerTarget->fullName())) { _oplogMarkerTarget = NULL; return false; } } } - - if (!_oplogMarker.haveCursor()) { + if (!theReplSet->syncSourceFeedback.haveCursor()) { BSONObj fields = BSON("ts" << 1); - _oplogMarker.tailingQueryGTE(rsoplog, theReplSet->lastOpTimeWritten, &fields); + theReplSet->syncSourceFeedback.tailingQueryGTE(rsoplog, + theReplSet->lastOpTimeWritten, &fields); } - return _oplogMarker.haveCursor(); + return theReplSet->syncSourceFeedback.haveCursor(); } void BackgroundSync::producerThread() { @@ -524,6 +530,8 @@ namespace replset { _currentSyncTarget = target; } + theReplSet->syncSourceFeedback.connect(current); + return; } diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 1f4cb3b1530..02d359cfee5 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -86,7 +86,6 @@ namespace replset { boost::mutex _lastOpMutex; const Member* _oplogMarkerTarget; - OplogReader _oplogMarker; // not locked, only used by notifier thread OpTime _consumedOpTime; // not locked, only used by notifier thread BackgroundSync(); diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp index 55fd017ca98..1fa974d6889 100644 --- a/src/mongo/db/repl/health.cpp +++ b/src/mongo/db/repl/health.cpp @@ -349,6 +349,15 @@ namespace mongo { return 0; } + Member* ReplSetImpl::getMutableMember(unsigned id) { + if( _self && id == _self->id() ) return _self; + + for( Member *m = head(); m; m = m->next() ) + if( m->id() == id ) + return m; + return 0; + } + Member* ReplSetImpl::findByName(const std::string& hostname) const { if (_self && hostname == _self->fullName()) { return _self; diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index d716eb2e49f..b2154ba0868 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -144,7 +144,7 @@ namespace mongo { if( conn() == 0 ) { _conn = shared_ptr<DBClientConnection>(new DBClientConnection(false, 0, - 30 /* tcp timeout */)); + tcp_timeout)); string errmsg; if ( !_conn->connect(hostName.c_str(), errmsg) || (AuthorizationManager::isAuthEnabled() && !replAuthenticate(_conn.get(), true)) ) { diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 801b9a4854e..345dc2be9ee 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -50,6 +50,9 @@ namespace mongo { return findOne(ns, Query().sort(reverseNaturalObj)); } + /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */ + static const int tcp_timeout = 30; + /* ok to call if already connected */ bool connect(const std::string& hostname); diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index ef6dcd8e7a6..26340d84b6b 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -25,6 +25,7 @@ #include "mongo/db/commands.h" #include "mongo/db/dbwebserver.h" #include "mongo/db/repl/health.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_server_status.h" // replSettings #include "mongo/db/repl/rs.h" #include "mongo/db/repl/rs_config.h" @@ -393,6 +394,44 @@ namespace mongo { } } cmdReplSetSyncFrom; + class CmdReplSetUpdatePosition: public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "internal"; + } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::replSetUpdatePosition); + out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions)); + } + CmdReplSetUpdatePosition() : ReplSetCommand("replSetUpdatePosition") { } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, + BSONObjBuilder& result, bool fromRepl) { + if (!check(errmsg, result)) + return false; + + if (cmdObj.hasField("handshake")) { + // we have received a handshake, not an update message + // handshakes are done here to ensure the receiving end supports the update command + cc().gotHandshake(cmdObj["handshake"].embeddedObject()); + // if we aren't primary, pass the handshake along + if (!theReplSet->isPrimary() && theReplSet->syncSourceFeedback.supportsUpdater()) { + theReplSet->syncSourceFeedback.forwardSlaveHandshake(); + } + return true; + } + + uassert(16888, "optimes field should be an array with an object for each secondary", + cmdObj["optimes"].type() == Array); + BSONArray newTimes = BSONArray(cmdObj["optimes"].Obj()); + updateSlaveLocations(newTimes); + + return true; + } + } cmdReplSetUpdatePosition; + using namespace bson; using namespace mongoutils::html; extern void fillRsLog(stringstream&); diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp index 3baf4d08adc..7290e24e7d6 100644 --- a/src/mongo/db/repl/rs.cpp +++ b/src/mongo/db/repl/rs.cpp @@ -925,8 +925,11 @@ namespace mongo { void ReplSetImpl::registerSlave(const BSONObj& rid, const int memberId) { // To prevent race conditions with clearing the cache at reconfig time, // we lock the replset mutex here. - lock lk(this); - ghost->associateSlave(rid, memberId); + { + lock lk(this); + ghost->associateSlave(rid, memberId); + } + syncSourceFeedback.associateMember(rid, memberId); } class ReplIndexPrefetch : public ServerParameter { diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h index 4190ded55c1..74d36edbb23 100644 --- a/src/mongo/db/repl/rs.h +++ b/src/mongo/db/repl/rs.h @@ -26,6 +26,7 @@ #include "mongo/db/repl/rs_exception.h" #include "mongo/db/repl/rs_member.h" #include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/sync_source_feedback.h" #include "mongo/util/concurrency/list.h" #include "mongo/util/concurrency/msg.h" #include "mongo/util/concurrency/thread_pool.h" @@ -341,6 +342,8 @@ namespace mongo { StateBox box; + SyncSourceFeedback syncSourceFeedback; + OpTime lastOpTimeWritten; long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork" bool forceSyncFrom(const string& host, string& errmsg, BSONObjBuilder& result); @@ -505,6 +508,7 @@ namespace mongo { Member* head() const { return _members.head(); } public: const Member* findById(unsigned id) const; + Member* getMutableMember(unsigned id); Member* findByName(const std::string& hostname) const; private: void _getTargets(list<Target>&, int &configVersion); diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp new file mode 100644 index 00000000000..2205bb808ad --- /dev/null +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -0,0 +1,308 @@ +/** +* Copyright (C) 2013 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "mongo/db/repl/sync_source_feedback.h" + +#include "mongo/client/constants.h" +#include "mongo/client/dbclientcursor.h" +#include "mongo/db/auth/authorization_manager.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/repl/bgsync.h" +#include "mongo/db/repl/rs.h" // theReplSet + +namespace mongo { + + // used in replAuthenticate + static const BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}"); + + void SyncSourceFeedback::associateMember(const BSONObj& id, const int memberId) { + const OID rid = id["_id"].OID(); + boost::unique_lock<boost::mutex> lock(_mtx); + _handshakeNeeded = true; + _members[rid] = theReplSet->getMutableMember(memberId); + _cond.notify_all(); + } + + bool SyncSourceFeedback::replAuthenticate(bool skipAuthCheck) { + + if (!AuthorizationManager::isAuthEnabled()) { + return true; + } + if (!skipAuthCheck && !cc().getAuthorizationSession()->hasInternalAuthorization()) { + log() << "replauthenticate: requires internal authorization, failing" << endl; + return false; + } + + string u; + string p; + if (internalSecurity.pwd.length() > 0) { + u = internalSecurity.user; + p = internalSecurity.pwd; + } + else { + BSONObj user; + { + Client::ReadContext ctxt("local."); + if(!Helpers::findOne("local.system.users", userReplQuery, user) || + // try the first user in local + !Helpers::getSingleton("local.system.users", user)) { + log() << "replauthenticate: no user in local.system.users to use" + << "for authentication" << endl; + return false; + } + } + u = user.getStringField("user"); + p = user.getStringField("pwd"); + massert(16886, "bad user object? [1]", !u.empty()); + massert(16887, "bad user object? [2]", !p.empty()); + } + + string err; + if( !_connection->auth("local", u.c_str(), p.c_str(), err, false) ) { + log() << "replauthenticate: can't authenticate to master server, user:" << u << endl; + return false; + } + + return true; + } + + void SyncSourceFeedback::ensureMe() { + string myname = getHostName(); + { + Client::WriteContext ctx("local"); + // local.me is an identifier for a server for getLastError w:2+ + if (!Helpers::getSingleton("local.me", _me) || + !_me.hasField("host") || + _me["host"].String() != myname) { + + // clean out local.me + Helpers::emptyCollection("local.me"); + + // repopulate + BSONObjBuilder b; + b.appendOID("_id", 0, true); + b.append("host", myname); + _me = b.obj(); + Helpers::putSingleton("local.me", _me); + } + } + } + + bool SyncSourceFeedback::replHandshake() { + ensureMe(); + + // handshake for us + BSONObjBuilder cmd; + cmd.append("replSetUpdatePosition", 1); + BSONObjBuilder sub (cmd.subobjStart("handshake")); + sub.appendAs(_me["_id"], "handshake"); + sub.append("member", theReplSet->selfId()); + sub.append("config", theReplSet->myConfig().asBson()); + sub.doneFast(); + + BSONObj res; + try { + if (!_connection->runCommand("admin", cmd.obj(), res)) { + if (res["errmsg"].str().find("no such cmd") != std::string::npos) { + _supportsUpdater = false; + } + resetConnection(); + return false; + } + else { + _supportsUpdater = true; + } + } + catch (const DBException& e) { + log() << "SyncSourceFeedback error sending handshake: " << e.what() << endl; + resetConnection(); + return false; + } + + // handshakes for those connected to us + { + for (OIDMemberMap::iterator itr = _members.begin(); + itr != _members.end(); ++itr) { + BSONObjBuilder slaveCmd; + slaveCmd.append("replSetUpdatePosition", 1); + // outer handshake indicates this is a handshake command + // inner is needed as part of the structure to be passed to gotHandshake + BSONObjBuilder slaveSub (slaveCmd.subobjStart("handshake")); + slaveSub.append("handshake", itr->first); + slaveSub.append("member", itr->second->id()); + slaveSub.append("config", itr->second->config().asBson()); + slaveSub.doneFast(); + BSONObj slaveRes; + try { + if (!_connection->runCommand("admin", slaveCmd.obj(), slaveRes)) { + resetConnection(); + return false; + } + } + catch (const DBException& e) { + log() << "SyncSourceFeedback error sending chained handshakes: " + << e.what() << endl; + resetConnection(); + return false; + } + } + } + return true; + } + + bool SyncSourceFeedback::_connect(const std::string& hostName) { + if (hasConnection()) { + return true; + } + _connection.reset(new DBClientConnection(false, 0, OplogReader::tcp_timeout)); + string errmsg; + if (!_connection->connect(hostName.c_str(), errmsg) || + (AuthorizationManager::isAuthEnabled() && !replAuthenticate(true))) { + resetConnection(); + log() << "repl: " << errmsg << endl; + return false; + } + + if (!replHandshake()) { + if (!supportsUpdater()) { + return connectOplogReader(hostName); + } + return false; + } + return true; + } + + bool SyncSourceFeedback::connect(const std::string& hostName) { + boost::unique_lock<boost::mutex> lock(_connmtx); + resetConnection(); + resetOplogReaderConnection(); + if (_connect(hostName)) { + if (!supportsUpdater()) { + return true; + } + } + return false; + } + + void SyncSourceFeedback::forwardSlaveHandshake() { + boost::unique_lock<boost::mutex> lock(_mtx); + _handshakeNeeded = true; + } + + void SyncSourceFeedback::updateMap(const mongo::OID& rid, const OpTime& ot) { + boost::unique_lock<boost::mutex> lock(_mtx); + LOG(1) << "replSet last: " << _slaveMap[rid].toString() << " to " << ot.toString() << endl; + // only update if ot is newer than what we have already + if (ot > _slaveMap[rid]) { + _slaveMap[rid] = ot; + _positionChanged = true; + LOG(2) << "now last is " << _slaveMap[rid].toString() << endl; + _cond.notify_all(); + } + } + + bool SyncSourceFeedback::updateUpstream() { + if (theReplSet->isPrimary()) { + // primary has no one to update to + return true; + } + BSONObjBuilder cmd; + cmd.append("replSetUpdatePosition", 1); + // create an array containing objects each member connected to us and for ourself + BSONArrayBuilder array (cmd.subarrayStart("optimes")); + OID myID = _me["_id"].OID(); + { + for (map<mongo::OID, OpTime>::const_iterator itr = _slaveMap.begin(); + itr != _slaveMap.end(); ++itr) { + BSONObjBuilder entry(array.subobjStart()); + entry.append("_id", itr->first); + entry.append("optime", itr->second); + if (itr->first == myID) { + entry.append("config", theReplSet->myConfig().asBson()); + } + else { + entry.append("config", _members[itr->first]->config().asBson()); + } + entry.doneFast(); + } + } + array.done(); + BSONObj res; + + bool ok; + try { + ok = _connection->runCommand("admin", cmd.obj(), res); + } + catch (const DBException& e) { + log() << "SyncSourceFeedback error sending update: " << e.what() << endl; + resetConnection(); + return false; + } + if (!ok) { + log() << "SyncSourceFeedback error sending update, response: " << res.toString() <<endl; + resetConnection(); + return false; + } + return true; + } + + void SyncSourceFeedback::run() { + Client::initThread("SyncSourceFeedbackThread"); + while (true) { + { + boost::unique_lock<boost::mutex> lock(_mtx); + while (!_positionChanged && !_handshakeNeeded) { + _cond.wait(lock); + } + boost::unique_lock<boost::mutex> conlock(_connmtx); + if (!hasConnection()) { + // fix connection if need be + const Member* target = replset::BackgroundSync::get()->getSyncTarget(); + if (!target) { + continue; + } + if (!_connect(target->fullName())) { + continue; + } + else if (!supportsUpdater()) { + _handshakeNeeded = false; + _positionChanged = false; + continue; + } + } + if (_handshakeNeeded) { + if (!replHandshake()) { + _handshakeNeeded = true; + continue; + } + else { + _handshakeNeeded = false; + } + } + if (_positionChanged) { + if (!updateUpstream()) { + _positionChanged = true; + continue; + } + else { + _positionChanged = false; + } + } + } + } + } +} diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h new file mode 100644 index 00000000000..278baba30e5 --- /dev/null +++ b/src/mongo/db/repl/sync_source_feedback.h @@ -0,0 +1,168 @@ +/** +* Copyright (C) 2013 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + + +#pragma once + +#include "mongo/db/repl/oplogreader.h" +#include "mongo/util/background.h" + + +namespace mongo { + + class Member; + + class SyncSourceFeedback : public BackgroundJob { + public: + SyncSourceFeedback() : BackgroundJob(false /*don't selfdelete*/), + _oplogReader(new OplogReader(true)), + _supportsUpdater(true) {} + + ~SyncSourceFeedback() { + delete _oplogReader; + } + + /// Adds an entry to _member for a secondary that has connected to us. + void associateMember(const BSONObj& id, const int memberId); + + /// Passes handshake up the replication chain, upon receiving a handshake. + void forwardSlaveHandshake(); + + void updateSelfInMap(const OpTime& ot) { + ensureMe(); + updateMap(_me["_id"].OID(), ot); + } + + /// Connect to sync target and create OplogReader if needed. + bool connect(const std::string& hostName); + + void resetConnection() { + _connection.reset(); + } + + void resetOplogReaderConnection() { + _oplogReader->resetConnection(); + } + + /// Used extensively in bgsync, to see if we need to use the OplogReader syncing method. + bool supportsUpdater() const { + // oplogReader will be NULL if new updater is supported + //boost::unique_lock<boost::mutex> lock(_mtx); + return _supportsUpdater; + } + + /// Updates the _slaveMap to be forwarded to the sync target. + void updateMap(const mongo::OID& rid, const OpTime& ot); + + std::string name() const { return "SyncSourceFeedbackThread"; } + + /// Loops forever, passing updates when they are present. + void run(); + + /* The below methods just fall through to OplogReader and are only used when our sync target + * does not support the update command. + */ + bool connectOplogReader(const std::string& hostName) { + return _oplogReader->connect(hostName); + } + + bool connect(const BSONObj& rid, const int from, const string& to) { + return _oplogReader->connect(rid, from, to); + } + + void ghostQueryGTE(const char *ns, OpTime t) { + _oplogReader->ghostQueryGTE(ns, t); + } + + bool haveCursor() { + return _oplogReader->haveCursor(); + } + + bool more() { + return _oplogReader->more(); + } + + bool moreInCurrentBatch() { + return _oplogReader->moreInCurrentBatch(); + } + + BSONObj nextSafe() { + return _oplogReader->nextSafe(); + } + + void tailCheck() { + _oplogReader->tailCheck(); + } + + void tailingQueryGTE(const char *ns, OpTime t, const BSONObj* fields=0) { + _oplogReader->tailingQueryGTE(ns, t, fields); + } + + private: + /// Ensures local.me is populated and populates it if not. + void ensureMe(); + + /* Generally replAuthenticate will only be called within system threads to fully + * authenticate connections to other nodes in the cluster that will be used as part of + * internal operations. If a user-initiated action results in needing to call + * replAuthenticate, you can call it with skipAuthCheck set to false. Only do this if you + * are certain that the proper auth checks have already run to ensure that the user is + * authorized to do everything that this connection will be used for! + */ + bool replAuthenticate(bool skipAuthCheck); + + /* Sends initialization information to our sync target, also determines whether or not they + * support the updater command. + */ + bool replHandshake(); + + /* Inform the sync target of our current position in the oplog, as well as the positions + * of all secondaries chained through us. + */ + bool updateUpstream(); + + bool hasConnection() { + return _connection.get(); + } + + /// Connect to sync target and create OplogReader if needed. + bool _connect(const std::string& hostName); + + // stores our OID to be passed along in commands + BSONObj _me; + // holds the oplogReader for use when we fall back to old style updates + OplogReader* _oplogReader; + // our connection to our sync target + boost::scoped_ptr<DBClientConnection> _connection; + // tracks whether we are in fallback mode or not + bool _supportsUpdater; + // protects connection + boost::mutex _connmtx; + // protects cond and maps and the indicator bools + boost::mutex _mtx; + // contains the most recent optime of each member syncing to us + map<mongo::OID, OpTime> _slaveMap; + typedef map<mongo::OID, Member*> OIDMemberMap; + // contains a pointer to each member, which we can look up by oid + OIDMemberMap _members; + // used to alert our thread of changes which need to be passed up the chain + boost::condition _cond; + // used to indicate a position change which has not yet been pushed along + bool _positionChanged; + // used to indicate a connection change which has not yet been shook on + bool _handshakeNeeded; + }; +} diff --git a/src/mongo/db/repl/write_concern.cpp b/src/mongo/db/repl/write_concern.cpp index 0564bc0a678..d05575f2135 100644 --- a/src/mongo/db/repl/write_concern.cpp +++ b/src/mongo/db/repl/write_concern.cpp @@ -122,20 +122,22 @@ namespace mongo { scoped_lock mylk(_mutex); - _slaves[ident] = last; - _dirty = true; + if (last > _slaves[ident]) { + _slaves[ident] = last; + _dirty = true; - if (theReplSet && theReplSet->isPrimary()) { - theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last); - } + if (theReplSet && theReplSet->isPrimary()) { + theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last); + } - if ( ! _started ) { - // start background thread here since we definitely need it - _started = true; - go(); + if ( ! _started ) { + // start background thread here since we definitely need it + _started = true; + go(); + } + + _threadsWaitingForReplication.notify_all(); } - - _threadsWaitingForReplication.notify_all(); } bool opReplicatedEnough( OpTime op , BSONElement w ) { @@ -253,6 +255,28 @@ namespace mongo { const char * SlaveTracking::NS = "local.slaves"; + // parse optimes from replUpdatePositionCommand and pass them to SyncSourceFeedback + void updateSlaveLocations(BSONArray optimes) { + BSONForEach(elem, optimes) { + BSONObj entry = elem.Obj(); + BSONObj id = BSON("_id" << entry["_id"].OID()); + OpTime ot = entry["optime"]._opTime(); + BSONObj config = entry["config"].Obj(); + + // update locally + slaveTracking.update(id, config, "local.oplog.rs", ot); + if (theReplSet && !theReplSet->isPrimary()) { + // pass along if we are not primary + theReplSet->syncSourceFeedback.updateMap(entry["_id"].OID(), ot); + // for to be backwards compatible + theReplSet->ghost->send(boost::bind(&GhostSync::percolate, + theReplSet->ghost, + id, + ot)); + } + } + } + void updateSlaveLocation( CurOp& curop, const char * ns , OpTime lastOp ) { if ( lastOp.isNull() ) return; diff --git a/src/mongo/db/repl/write_concern.h b/src/mongo/db/repl/write_concern.h index d9b0163635b..b28ae1ae830 100644 --- a/src/mongo/db/repl/write_concern.h +++ b/src/mongo/db/repl/write_concern.h @@ -29,6 +29,8 @@ namespace mongo { class CurOp; + void updateSlaveLocations(BSONArray optimes); + void updateSlaveLocation( CurOp& curop, const char * oplog_ns , OpTime lastOp ); /** @return true if op has made it to w servers */ |