diff options
author | Eric Milkie <milkie@10gen.com> | 2013-07-12 14:24:08 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2013-07-12 15:14:09 -0400 |
commit | 6486b4035c5ac52679eb3e1a034c925ccdd20deb (patch) | |
tree | c74c2fe36c819534e7444cdbe1aa12f533664304 /src/mongo | |
parent | ac386485a9b9f141a518f2aca1a2b66b8f976104 (diff) | |
download | mongo-6486b4035c5ac52679eb3e1a034c925ccdd20deb.tar.gz |
Revert "SERVER-6071 use command on local.slaves instead of cursor"
This reverts commit 2267744af0e972bceccb4ff4e9ed19a1ed639d2e.
Revert "SERVER-6071 correct assert code"
This reverts commit 85ca38c33e4aaebad539e78a05aed329b09c1208.
Conflicts:
src/mongo/db/repl/sync_source_feedback.cpp
Revert "SERVER-6071 use command on local.slaves instead of cursor"
This reverts commit 83ecb9775b7997dd5115c53f6ea30d2e368a4244.
Conflicts:
src/mongo/db/repl/replset_commands.cpp
src/mongo/db/repl/sync_source_feedback.cpp
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/auth/action_types.txt | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/health.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_commands.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/repl/rs.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/rs.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.cpp | 310 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.h | 171 | ||||
-rw-r--r-- | src/mongo/db/repl/write_concern.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/repl/write_concern.h | 2 |
14 files changed, 45 insertions, 619 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 464bc81b720..d61190c3188 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -456,7 +456,6 @@ serverOnlyFiles = [ "db/curop.cpp", "db/repl/master_slave.cpp", "db/repl/finding_start_cursor.cpp", "db/repl/sync.cpp", - "db/repl/sync_source_feedback.cpp", "db/repl/optime.cpp", "db/repl/oplogreader.cpp", "db/repl/replication_server_status.cpp", diff --git a/src/mongo/db/auth/action_types.txt b/src/mongo/db/auth/action_types.txt index bebec04b78e..057084293a3 100644 --- a/src/mongo/db/auth/action_types.txt +++ b/src/mongo/db/auth/action_types.txt @@ -71,7 +71,6 @@ "replSetReconfig", "replSetStepDown", "replSetSyncFrom", -"replSetUpdatePosition", "resync", "serverStatus", "setParameter", diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index f453664262d..6e9e7538f5a 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -22,7 +22,6 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/rs_sync.h" -#include "mongo/db/repl/rs.h" #include "mongo/util/fail_point_service.h" #include "mongo/base/counter.h" #include "mongo/db/stats/timer_stats.h" @@ -80,6 +79,7 @@ namespace replset { _assumingPrimary(false), _currentSyncTarget(NULL), _oplogMarkerTarget(NULL), + _oplogMarker(true /* doHandshake */), _consumedOpTime(0, 0) { } @@ -122,7 +122,6 @@ namespace replset { void BackgroundSync::notifierThread() { Client::initThread("rsSyncNotifier"); replLocalAuth(); - theReplSet->syncSourceFeedback.go(); while (!inShutdown()) { bool clearTarget = false; @@ -169,44 +168,37 @@ namespace replset { } void BackgroundSync::markOplog() { - LOG(3) << "replset markOplog: " << _consumedOpTime << " " - << theReplSet->lastOpTimeWritten << rsLog; + LOG(3) << "replset markOplog: " << _consumedOpTime << " " << theReplSet->lastOpTimeWritten << rsLog; - if (theReplSet->syncSourceFeedback.supportsUpdater()) { - theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten); - _consumedOpTime = theReplSet->lastOpTimeWritten; + if (!hasCursor()) { + sleepsecs(1); + return; } - else { - if (!hasCursor()) { - return; - } - if (!theReplSet->syncSourceFeedback.moreInCurrentBatch()) { - theReplSet->syncSourceFeedback.more(); - } - - if (!theReplSet->syncSourceFeedback.more()) { - theReplSet->syncSourceFeedback.tailCheck(); - return; - } + if (!_oplogMarker.moreInCurrentBatch()) { + _oplogMarker.more(); + } - // if this member has written the op at optime T - // we want to nextSafe up to and including T - while (_consumedOpTime < theReplSet->lastOpTimeWritten - && theReplSet->syncSourceFeedback.more()) { - BSONObj temp = theReplSet->syncSourceFeedback.nextSafe(); - _consumedOpTime = temp["ts"]._opTime(); - } + if (!_oplogMarker.more()) { + _oplogMarker.tailCheck(); + sleepsecs(1); + return; + } - // call more() to signal the sync target that we've synced T - theReplSet->syncSourceFeedback.more(); + // if this member has written the op at optime T, we want to nextSafe up to and including T + while (_consumedOpTime < theReplSet->lastOpTimeWritten && _oplogMarker.more()) { + BSONObj temp = _oplogMarker.nextSafe(); + _consumedOpTime = temp["ts"]._opTime(); } + + // call more() to signal the sync target that we've synced T + _oplogMarker.more(); } bool BackgroundSync::hasCursor() { { // prevent writers from blocking readers during fsync - SimpleMutex::scoped_lock fsynclk(filesLockedFsync); + SimpleMutex::scoped_lock fsynclk(filesLockedFsync); // we don't need the local write lock yet, but it's needed by OplogReader::connect // so we take it preemptively to avoid deadlocking. Lock::DBWrite lk("local"); @@ -218,23 +210,25 @@ namespace replset { return false; } - log() << "replset setting oplog notifier to " - << _currentSyncTarget->fullName() << rsLog; + log() << "replset setting oplog notifier to " << _currentSyncTarget->fullName() << rsLog; _oplogMarkerTarget = _currentSyncTarget; - if (!theReplSet->syncSourceFeedback.connect(_oplogMarkerTarget)) { + _oplogMarker.resetConnection(); + + if (!_oplogMarker.connect(_oplogMarkerTarget->fullName())) { + LOG(1) << "replset could not connect to " << _oplogMarkerTarget->fullName() << rsLog; _oplogMarkerTarget = NULL; return false; } } } - if (!theReplSet->syncSourceFeedback.haveCursor()) { + + if (!_oplogMarker.haveCursor()) { BSONObj fields = BSON("ts" << 1); - theReplSet->syncSourceFeedback.tailingQueryGTE(rsoplog, - theReplSet->lastOpTimeWritten, &fields); + _oplogMarker.tailingQueryGTE(rsoplog, theReplSet->lastOpTimeWritten, &fields); } - return theReplSet->syncSourceFeedback.haveCursor(); + return _oplogMarker.haveCursor(); } void BackgroundSync::producerThread() { @@ -531,8 +525,6 @@ namespace replset { _currentSyncTarget = target; } - theReplSet->syncSourceFeedback.connect(target); - return; } diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 02d359cfee5..1f4cb3b1530 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -86,6 +86,7 @@ namespace replset { boost::mutex _lastOpMutex; const Member* _oplogMarkerTarget; + OplogReader _oplogMarker; // not locked, only used by notifier thread OpTime _consumedOpTime; // not locked, only used by notifier thread BackgroundSync(); diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp index 297ebde93f5..3a2681128f8 100644 --- a/src/mongo/db/repl/health.cpp +++ b/src/mongo/db/repl/health.cpp @@ -348,15 +348,6 @@ namespace mongo { return 0; } - Member* ReplSetImpl::getMutableMember(unsigned id) { - if( _self && id == _self->id() ) return _self; - - for( Member *m = head(); m; m = m->next() ) - if( m->id() == id ) - return m; - return 0; - } - Member* ReplSetImpl::findByName(const std::string& hostname) const { if (_self && hostname == _self->fullName()) { return _self; diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index ea55ada5c11..5ee5a4628b4 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -140,7 +140,7 @@ namespace mongo { if( conn() == 0 ) { _conn = shared_ptr<DBClientConnection>(new DBClientConnection(false, 0, - tcp_timeout)); + 30 /* tcp timeout */)); string errmsg; if ( !_conn->connect(hostName.c_str(), errmsg) || (AuthorizationManager::isAuthEnabled() && !replAuthenticate(_conn.get(), true)) ) { diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 345dc2be9ee..801b9a4854e 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -50,9 +50,6 @@ namespace mongo { return findOne(ns, Query().sort(reverseNaturalObj)); } - /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */ - static const int tcp_timeout = 30; - /* ok to call if already connected */ bool connect(const std::string& hostname); diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index b20cfa8e34b..e8de0043ea8 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -24,7 +24,6 @@ #include "mongo/db/cmdline.h" #include "mongo/db/commands.h" #include "mongo/db/repl/health.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_server_status.h" // replSettings #include "mongo/db/repl/rs.h" #include "mongo/db/repl/rs_config.h" @@ -392,42 +391,4 @@ namespace mongo { } } cmdReplSetSyncFrom; - class CmdReplSetUpdatePosition: public ReplSetCommand { - public: - virtual void help( stringstream &help ) const { - help << "internal"; - } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::replSetUpdatePosition); - out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions)); - } - CmdReplSetUpdatePosition() : ReplSetCommand("replSetUpdatePosition") { } - virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, - BSONObjBuilder& result, bool fromRepl) { - if (!check(errmsg, result)) - return false; - - if (cmdObj.hasField("handshake")) { - // we have received a handshake, not an update message - // handshakes are done here to ensure the receiving end supports the update command - cc().gotHandshake(cmdObj["handshake"].embeddedObject()); - // if we aren't primary, pass the handshake along - if (!theReplSet->isPrimary() && theReplSet->syncSourceFeedback.supportsUpdater()) { - theReplSet->syncSourceFeedback.forwardSlaveHandshake(); - } - return true; - } - - uassert(16888, "optimes field should be an array with an object for each secondary", - cmdObj["optimes"].type() == Array); - BSONArray newTimes = BSONArray(cmdObj["optimes"].Obj()); - updateSlaveLocations(newTimes); - - return true; - } - } cmdReplSetUpdatePosition; - } diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp index e0c58423611..0f539601485 100644 --- a/src/mongo/db/repl/rs.cpp +++ b/src/mongo/db/repl/rs.cpp @@ -925,11 +925,8 @@ namespace mongo { void ReplSetImpl::registerSlave(const BSONObj& rid, const int memberId) { // To prevent race conditions with clearing the cache at reconfig time, // we lock the replset mutex here. - { - lock lk(this); - ghost->associateSlave(rid, memberId); - } - syncSourceFeedback.associateMember(rid, memberId); + lock lk(this); + ghost->associateSlave(rid, memberId); } class ReplIndexPrefetch : public ServerParameter { diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h index 74d36edbb23..4190ded55c1 100644 --- a/src/mongo/db/repl/rs.h +++ b/src/mongo/db/repl/rs.h @@ -26,7 +26,6 @@ #include "mongo/db/repl/rs_exception.h" #include "mongo/db/repl/rs_member.h" #include "mongo/db/repl/rs_sync.h" -#include "mongo/db/repl/sync_source_feedback.h" #include "mongo/util/concurrency/list.h" #include "mongo/util/concurrency/msg.h" #include "mongo/util/concurrency/thread_pool.h" @@ -342,8 +341,6 @@ namespace mongo { StateBox box; - SyncSourceFeedback syncSourceFeedback; - OpTime lastOpTimeWritten; long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork" bool forceSyncFrom(const string& host, string& errmsg, BSONObjBuilder& result); @@ -508,7 +505,6 @@ namespace mongo { Member* head() const { return _members.head(); } public: const Member* findById(unsigned id) const; - Member* getMutableMember(unsigned id); Member* findByName(const std::string& hostname) const; private: void _getTargets(list<Target>&, int &configVersion); diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp deleted file mode 100644 index 21a7bffd3c4..00000000000 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ /dev/null @@ -1,310 +0,0 @@ -/** -* Copyright (C) 2013 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "mongo/db/repl/sync_source_feedback.h" - -#include "mongo/client/constants.h" -#include "mongo/client/dbclientcursor.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/auth/security_key.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/repl/bgsync.h" -#include "mongo/db/repl/rs.h" // theReplSet - -namespace mongo { - - // used in replAuthenticate - static const BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}"); - - void SyncSourceFeedback::associateMember(const BSONObj& id, const int memberId) { - const OID rid = id["_id"].OID(); - boost::unique_lock<boost::mutex> lock(_mtx); - _handshakeNeeded = true; - _members[rid] = theReplSet->getMutableMember(memberId); - _cond.notify_all(); - } - - bool SyncSourceFeedback::replAuthenticate(bool skipAuthCheck) { - - if (!AuthorizationManager::isAuthEnabled()) { - return true; - } - if (!skipAuthCheck && !cc().getAuthorizationSession()->hasInternalAuthorization()) { - log() << "replauthenticate: requires internal authorization, failing" << endl; - return false; - } - - if (isInternalAuthSet()) { - return authenticateInternalUser(_connection.get()); - } - - BSONObj user; - { - Client::ReadContext ctxt("local."); - if(!Helpers::findOne("local.system.users", userReplQuery, user) || - // try the first user in local - !Helpers::getSingleton("local.system.users", user)) { - log() << "replauthenticate: no user in local.system.users to use" - << "for authentication" << endl; - return false; - } - } - std::string u = user.getStringField("user"); - std::string p = user.getStringField("pwd"); - massert(16889, "bad user object? [1]", !u.empty()); - massert(16887, "bad user object? [2]", !p.empty()); - - std::string err; - - if( !_connection->auth("local", u.c_str(), p.c_str(), err, false) ) { - log() << "replauthenticate: can't authenticate to master server, user:" << u << endl; - return false; - } - - return true; - } - - void SyncSourceFeedback::ensureMe() { - string myname = getHostName(); - { - Client::WriteContext ctx("local"); - // local.me is an identifier for a server for getLastError w:2+ - if (!Helpers::getSingleton("local.me", _me) || - !_me.hasField("host") || - _me["host"].String() != myname) { - - // clean out local.me - Helpers::emptyCollection("local.me"); - - // repopulate - BSONObjBuilder b; - b.appendOID("_id", 0, true); - b.append("host", myname); - _me = b.obj(); - Helpers::putSingleton("local.me", _me); - } - } - } - - bool SyncSourceFeedback::replHandshake() { - ensureMe(); - - // handshake for us - BSONObjBuilder cmd; - cmd.append("replSetUpdatePosition", 1); - BSONObjBuilder sub (cmd.subobjStart("handshake")); - sub.appendAs(_me["_id"], "handshake"); - sub.append("member", theReplSet->selfId()); - sub.append("config", theReplSet->myConfig().asBson()); - sub.doneFast(); - - BSONObj res; - try { - if (!_connection->runCommand("admin", cmd.obj(), res)) { - if (res["errmsg"].str().find("no such cmd") != std::string::npos) { - _supportsUpdater = false; - } - resetConnection(); - return false; - } - else { - _supportsUpdater = true; - } - } - catch (const DBException& e) { - log() << "SyncSourceFeedback error sending handshake: " << e.what() << endl; - resetConnection(); - return false; - } - - // handshakes for those connected to us - { - for (OIDMemberMap::iterator itr = _members.begin(); - itr != _members.end(); ++itr) { - BSONObjBuilder slaveCmd; - slaveCmd.append("replSetUpdatePosition", 1); - // outer handshake indicates this is a handshake command - // inner is needed as part of the structure to be passed to gotHandshake - BSONObjBuilder slaveSub (slaveCmd.subobjStart("handshake")); - slaveSub.append("handshake", itr->first); - slaveSub.append("member", itr->second->id()); - slaveSub.append("config", itr->second->config().asBson()); - slaveSub.doneFast(); - BSONObj slaveRes; - try { - if (!_connection->runCommand("admin", slaveCmd.obj(), slaveRes)) { - resetConnection(); - return false; - } - } - catch (const DBException& e) { - log() << "SyncSourceFeedback error sending chained handshakes: " - << e.what() << endl; - resetConnection(); - return false; - } - } - } - return true; - } - - bool SyncSourceFeedback::_connect(const std::string& hostName) { - if (hasConnection()) { - return true; - } - _connection.reset(new DBClientConnection(false, 0, OplogReader::tcp_timeout)); - string errmsg; - if (!_connection->connect(hostName.c_str(), errmsg) || - (AuthorizationManager::isAuthEnabled() && !replAuthenticate(true))) { - resetConnection(); - log() << "repl: " << errmsg << endl; - return false; - } - - if (!replHandshake()) { - if (!supportsUpdater()) { - return connectOplogReader(hostName); - } - return false; - } - return true; - } - - bool SyncSourceFeedback::connect(const Member* target) { - boost::unique_lock<boost::mutex> lock(_connmtx); - resetConnection(); - resetOplogReaderConnection(); - _syncTarget = target; - if (_connect(target->fullName())) { - if (!supportsUpdater()) { - return true; - } - } - return false; - } - - void SyncSourceFeedback::forwardSlaveHandshake() { - boost::unique_lock<boost::mutex> lock(_mtx); - _handshakeNeeded = true; - } - - void SyncSourceFeedback::updateMap(const mongo::OID& rid, const OpTime& ot) { - boost::unique_lock<boost::mutex> lock(_mtx); - LOG(1) << "replSet last: " << _slaveMap[rid].toString() << " to " << ot.toString() << endl; - // only update if ot is newer than what we have already - if (ot > _slaveMap[rid]) { - _slaveMap[rid] = ot; - _positionChanged = true; - LOG(2) << "now last is " << _slaveMap[rid].toString() << endl; - _cond.notify_all(); - } - } - - bool SyncSourceFeedback::updateUpstream() { - if (theReplSet->isPrimary()) { - // primary has no one to update to - return true; - } - BSONObjBuilder cmd; - cmd.append("replSetUpdatePosition", 1); - // create an array containing objects each member connected to us and for ourself - BSONArrayBuilder array (cmd.subarrayStart("optimes")); - OID myID = _me["_id"].OID(); - { - for (map<mongo::OID, OpTime>::const_iterator itr = _slaveMap.begin(); - itr != _slaveMap.end(); ++itr) { - BSONObjBuilder entry(array.subobjStart()); - entry.append("_id", itr->first); - entry.append("optime", itr->second); - if (itr->first == myID) { - entry.append("config", theReplSet->myConfig().asBson()); - } - else { - entry.append("config", _members[itr->first]->config().asBson()); - } - entry.doneFast(); - } - } - array.done(); - BSONObj res; - - bool ok; - try { - ok = _connection->runCommand("admin", cmd.obj(), res); - } - catch (const DBException& e) { - log() << "SyncSourceFeedback error sending update: " << e.what() << endl; - resetConnection(); - return false; - } - if (!ok) { - log() << "SyncSourceFeedback error sending update, response: " << res.toString() <<endl; - resetConnection(); - return false; - } - return true; - } - - void SyncSourceFeedback::run() { - Client::initThread("SyncSourceFeedbackThread"); - while (true) { - { - boost::unique_lock<boost::mutex> lock(_mtx); - while (!_positionChanged && !_handshakeNeeded) { - _cond.wait(lock); - } - boost::unique_lock<boost::mutex> conlock(_connmtx); - const Member* target = replset::BackgroundSync::get()->getSyncTarget(); - if (_syncTarget != target) { - resetConnection(); - _syncTarget = target; - } - if (!hasConnection()) { - // fix connection if need be - if (!target) { - continue; - } - if (!_connect(target->fullName())) { - continue; - } - else if (!supportsUpdater()) { - _handshakeNeeded = false; - _positionChanged = false; - continue; - } - } - if (_handshakeNeeded) { - if (!replHandshake()) { - _handshakeNeeded = true; - continue; - } - else { - _handshakeNeeded = false; - } - } - if (_positionChanged) { - if (!updateUpstream()) { - _positionChanged = true; - continue; - } - else { - _positionChanged = false; - } - } - } - } - } -} diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h deleted file mode 100644 index 72073841029..00000000000 --- a/src/mongo/db/repl/sync_source_feedback.h +++ /dev/null @@ -1,171 +0,0 @@ -/** -* Copyright (C) 2013 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - - -#pragma once - -#include "mongo/db/repl/oplogreader.h" -#include "mongo/util/background.h" - - -namespace mongo { - - class Member; - - class SyncSourceFeedback : public BackgroundJob { - public: - SyncSourceFeedback() : BackgroundJob(false /*don't selfdelete*/), - _syncTarget(NULL), - _oplogReader(new OplogReader(true)), - _supportsUpdater(true) {} - - ~SyncSourceFeedback() { - delete _oplogReader; - } - - /// Adds an entry to _member for a secondary that has connected to us. - void associateMember(const BSONObj& id, const int memberId); - - /// Passes handshake up the replication chain, upon receiving a handshake. - void forwardSlaveHandshake(); - - void updateSelfInMap(const OpTime& ot) { - ensureMe(); - updateMap(_me["_id"].OID(), ot); - } - - /// Connect to sync target and create OplogReader if needed. - bool connect(const Member* target); - - void resetConnection() { - _connection.reset(); - } - - void resetOplogReaderConnection() { - _oplogReader->resetConnection(); - } - - /// Used extensively in bgsync, to see if we need to use the OplogReader syncing method. - bool supportsUpdater() const { - // oplogReader will be NULL if new updater is supported - //boost::unique_lock<boost::mutex> lock(_mtx); - return _supportsUpdater; - } - - /// Updates the _slaveMap to be forwarded to the sync target. - void updateMap(const mongo::OID& rid, const OpTime& ot); - - std::string name() const { return "SyncSourceFeedbackThread"; } - - /// Loops forever, passing updates when they are present. - void run(); - - /* The below methods just fall through to OplogReader and are only used when our sync target - * does not support the update command. - */ - bool connectOplogReader(const std::string& hostName) { - return _oplogReader->connect(hostName); - } - - bool connect(const BSONObj& rid, const int from, const string& to) { - return _oplogReader->connect(rid, from, to); - } - - void ghostQueryGTE(const char *ns, OpTime t) { - _oplogReader->ghostQueryGTE(ns, t); - } - - bool haveCursor() { - return _oplogReader->haveCursor(); - } - - bool more() { - return _oplogReader->more(); - } - - bool moreInCurrentBatch() { - return _oplogReader->moreInCurrentBatch(); - } - - BSONObj nextSafe() { - return _oplogReader->nextSafe(); - } - - void tailCheck() { - _oplogReader->tailCheck(); - } - - void tailingQueryGTE(const char *ns, OpTime t, const BSONObj* fields=0) { - _oplogReader->tailingQueryGTE(ns, t, fields); - } - - private: - /// Ensures local.me is populated and populates it if not. - void ensureMe(); - - /* Generally replAuthenticate will only be called within system threads to fully - * authenticate connections to other nodes in the cluster that will be used as part of - * internal operations. If a user-initiated action results in needing to call - * replAuthenticate, you can call it with skipAuthCheck set to false. Only do this if you - * are certain that the proper auth checks have already run to ensure that the user is - * authorized to do everything that this connection will be used for! - */ - bool replAuthenticate(bool skipAuthCheck); - - /* Sends initialization information to our sync target, also determines whether or not they - * support the updater command. - */ - bool replHandshake(); - - /* Inform the sync target of our current position in the oplog, as well as the positions - * of all secondaries chained through us. - */ - bool updateUpstream(); - - bool hasConnection() { - return _connection.get(); - } - - /// Connect to sync target and create OplogReader if needed. - bool _connect(const std::string& hostName); - - // stores our OID to be passed along in commands - BSONObj _me; - // the member we are currently syncing from - const Member* _syncTarget; - // holds the oplogReader for use when we fall back to old style updates - OplogReader* _oplogReader; - // our connection to our sync target - boost::scoped_ptr<DBClientConnection> _connection; - // tracks whether we are in fallback mode or not - bool _supportsUpdater; - // protects connection - boost::mutex _connmtx; - // protects cond and maps and the indicator bools - boost::mutex _mtx; - // contains the most recent optime of each member syncing to us - map<mongo::OID, OpTime> _slaveMap; - typedef map<mongo::OID, Member*> OIDMemberMap; - // contains a pointer to each member, which we can look up by oid - OIDMemberMap _members; - // used to alert our thread of changes which need to be passed up the chain - boost::condition _cond; - // used to indicate a position change which has not yet been pushed along - bool _positionChanged; - // used to indicate a connection change which has not yet been shook on - bool _handshakeNeeded; - }; -} diff --git a/src/mongo/db/repl/write_concern.cpp b/src/mongo/db/repl/write_concern.cpp index d05575f2135..0564bc0a678 100644 --- a/src/mongo/db/repl/write_concern.cpp +++ b/src/mongo/db/repl/write_concern.cpp @@ -122,22 +122,20 @@ namespace mongo { scoped_lock mylk(_mutex); - if (last > _slaves[ident]) { - _slaves[ident] = last; - _dirty = true; + _slaves[ident] = last; + _dirty = true; - if (theReplSet && theReplSet->isPrimary()) { - theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last); - } - - if ( ! _started ) { - // start background thread here since we definitely need it - _started = true; - go(); - } + if (theReplSet && theReplSet->isPrimary()) { + theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last); + } - _threadsWaitingForReplication.notify_all(); + if ( ! _started ) { + // start background thread here since we definitely need it + _started = true; + go(); } + + _threadsWaitingForReplication.notify_all(); } bool opReplicatedEnough( OpTime op , BSONElement w ) { @@ -255,28 +253,6 @@ namespace mongo { const char * SlaveTracking::NS = "local.slaves"; - // parse optimes from replUpdatePositionCommand and pass them to SyncSourceFeedback - void updateSlaveLocations(BSONArray optimes) { - BSONForEach(elem, optimes) { - BSONObj entry = elem.Obj(); - BSONObj id = BSON("_id" << entry["_id"].OID()); - OpTime ot = entry["optime"]._opTime(); - BSONObj config = entry["config"].Obj(); - - // update locally - slaveTracking.update(id, config, "local.oplog.rs", ot); - if (theReplSet && !theReplSet->isPrimary()) { - // pass along if we are not primary - theReplSet->syncSourceFeedback.updateMap(entry["_id"].OID(), ot); - // for to be backwards compatible - theReplSet->ghost->send(boost::bind(&GhostSync::percolate, - theReplSet->ghost, - id, - ot)); - } - } - } - void updateSlaveLocation( CurOp& curop, const char * ns , OpTime lastOp ) { if ( lastOp.isNull() ) return; diff --git a/src/mongo/db/repl/write_concern.h b/src/mongo/db/repl/write_concern.h index b28ae1ae830..d9b0163635b 100644 --- a/src/mongo/db/repl/write_concern.h +++ b/src/mongo/db/repl/write_concern.h @@ -29,8 +29,6 @@ namespace mongo { class CurOp; - void updateSlaveLocations(BSONArray optimes); - void updateSlaveLocation( CurOp& curop, const char * oplog_ns , OpTime lastOp ); /** @return true if op has made it to w servers */ |