diff options
author | Eric Milkie <milkie@10gen.com> | 2014-09-05 13:50:07 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2014-09-05 15:39:25 -0400 |
commit | 4ce061a127e9f1fb1d31fbaeb64d1179a319316c (patch) | |
tree | 52b3edef55e923346c3e54e68b5ad7fbb4197693 /src | |
parent | f7c4d34ddb4a88480a1091d1c39e9db450c2d244 (diff) | |
download | mongo-4ce061a127e9f1fb1d31fbaeb64d1179a319316c.tar.gz |
SERVER-15089 cleanup oplogreader
1. Move some master/slave oplogreader code into master_slave.cpp
2. fix up includes for some files; remove pch.h from a header (!)
3. remove awaitCapable check
4. add new "_host" field to oplogreader to store the HostAndPort we're connected
to; will be used by new Applier
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/master_slave.cpp | 44 | ||||
-rw-r--r-- | src/mongo/db/repl/master_slave.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.h | 45 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 1 | ||||
-rw-r--r-- | src/mongo/util/queue.h | 6 |
9 files changed, 59 insertions, 123 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 2ed63b2b512..0e3ad68352b 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -213,8 +213,6 @@ namespace repl { return; } - uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() ); - if (isRollbackRequired(txn, r)) { stop(); return; diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 5135f77509d..044ecf88029 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -337,6 +337,36 @@ namespace repl { replAllDead = 0; } + bool replHandshake(DBClientConnection *conn, const OID& myRID) { + string myname = getHostName(); + + BSONObjBuilder cmd; + cmd.append("handshake", myRID); + + BSONObj res; + bool ok = conn->runCommand( "admin" , cmd.obj() , res ); + // ignoring for now on purpose for older versions + LOG( ok ? 1 : 0 ) << "replHandshake res not: " << ok << " res: " << res << endl; + return true; + } + + bool ReplSource::_connect(OplogReader* reader, const HostAndPort& host, const OID& myRID) { + if (reader->conn()) { + return true; + } + + if (!reader->connect(host)) { + return false; + } + + if (!replHandshake(reader->conn(), myRID)) { + return false; + } + + return true; + } + + void ReplSource::forceResync( OperationContext* txn, const char *requester ) { BSONObj info; { @@ -344,8 +374,8 @@ namespace repl { invariant(txn->lockState()->isW()); Lock::TempRelease tempRelease(txn->lockState()); - if (!oplogReader.connect(HostAndPort(hostName), - getGlobalReplicationCoordinator()->getMyRID())) { + if (!_connect(&oplogReader, HostAndPort(hostName), + getGlobalReplicationCoordinator()->getMyRID())) { msgassertedNoTrace( 14051 , "unable to connect to resync"); } /* todo use getDatabaseNames() method here */ @@ -829,8 +859,7 @@ namespace repl { if ( !oplogReader.more() ) { if ( tailing ) { LOG(2) << "repl: tailing & no new activity\n"; - if( oplogReader.awaitCapable() ) - okResultCode = 0; // don't sleep + okResultCode = 0; // don't sleep } else { @@ -911,7 +940,7 @@ namespace repl { if ( moreInitialSyncsPending || !oplogReader.more() ) { Lock::GlobalWrite lk(txn->lockState()); - if (oplogReader.awaitCapable() && tailing) { + if (tailing) { okResultCode = 0; // don't sleep } @@ -1021,8 +1050,9 @@ namespace repl { return -1; } - if ( !oplogReader.connect(HostAndPort(hostName), - getGlobalReplicationCoordinator()->getMyRID()) ) { + if ( !_connect(&oplogReader, + HostAndPort(hostName), + getGlobalReplicationCoordinator()->getMyRID()) ) { LOG(4) << "repl: can't connect to sync source" << endl; return -1; } diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h index 24c4e30c968..6aa52339507 100644 --- a/src/mongo/db/repl/master_slave.h +++ b/src/mongo/db/repl/master_slave.h @@ -122,6 +122,7 @@ namespace repl { void forceResync(OperationContext* txn, const char *requester); + bool _connect(OplogReader* reader, const HostAndPort& host, const OID& myRID); public: OplogReader oplogReader; diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index acb2eb71d7d..23997eabeb3 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -71,23 +71,6 @@ namespace repl { return authenticateInternalUser(conn); } - bool replHandshake(DBClientConnection *conn, const OID& myRID) { - string myname = getHostName(); - - BSONObjBuilder cmd; - cmd.append("handshake", myRID); - if (theReplSet) { - cmd.append("member", theReplSet->selfId()); - cmd.append("config", theReplSet->myConfig().asBson()); - } - - BSONObj res; - bool ok = conn->runCommand( "admin" , cmd.obj() , res ); - // ignoring for now on purpose for older versions - LOG( ok ? 1 : 0 ) << "replHandshake res not: " << ok << " res: " << res << endl; - return true; - } - OplogReader::OplogReader() { _tailingQueryOptions = QueryOption_SlaveOk; _tailingQueryOptions |= QueryOption_CursorTailable | QueryOption_OplogReplay; @@ -98,8 +81,8 @@ namespace repl { readersCreatedStats.increment(); } - bool OplogReader::commonConnect(const HostAndPort& host) { - if( conn() == 0 ) { + bool OplogReader::connect(const HostAndPort& host) { + if (conn() == NULL) { _conn = shared_ptr<DBClientConnection>(new DBClientConnection(false, 0, tcp_timeout)); @@ -112,49 +95,11 @@ namespace repl { log() << "repl: " << errmsg << endl; return false; } + _host = host; } return true; } - bool OplogReader::connect(const HostAndPort& host) { - if (conn()) { - return true; - } - - if (!commonConnect(host)) { - return false; - } - - return true; - } - - bool OplogReader::connect(const HostAndPort& host, const OID& myRID) { - if (conn()) { - return true; - } - - if (!commonConnect(host)) { - return false; - } - - if (!replHandshake(_conn.get(), myRID)) { - return false; - } - - return true; - } - - bool OplogReader::connect(const mongo::OID& rid, const int from, const HostAndPort& to) { - if (conn() != 0) { - return true; - } - if (commonConnect(to)) { - log() << "handshake between " << from << " and " << to.toString() << endl; - return passthroughHandshake(rid, from); - } - return false; - } - void OplogReader::tailCheck() { if( cursor.get() && cursor->isDead() ) { log() << "repl: old cursor isDead, will initiate a new one" << std::endl; @@ -162,21 +107,6 @@ namespace repl { } } - bool OplogReader::passthroughHandshake(const mongo::OID& rid, const int nextOnChainId) { - BSONObjBuilder cmd; - cmd.append("handshake", rid); - if (theReplSet) { - const Member* chainedMember = theReplSet->findById(nextOnChainId); - if (chainedMember != NULL) { - cmd.append("config", chainedMember->config().asBson()); - } - } - cmd.append("member", nextOnChainId); - - BSONObj res; - return conn()->runCommand("admin", cmd.obj(), res); - } - void OplogReader::query(const char *ns, Query query, int nToReturn, @@ -201,5 +131,9 @@ namespace repl { tailingQuery(ns, query.done(), fields); } + HostAndPort OplogReader::getHost() const { + return _host; + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 0ce08a19d62..884276035ee 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -33,6 +33,7 @@ #include "mongo/client/constants.h" #include "mongo/client/dbclientcursor.h" +#include "mongo/util/net/hostandport.h" namespace mongo { @@ -51,9 +52,14 @@ namespace repl { */ class OplogReader { + private: shared_ptr<DBClientConnection> _conn; shared_ptr<DBClientCursor> cursor; int _tailingQueryOptions; + + // If _conn was actively connected, _host represents the current HostAndPort of the + // connection. + HostAndPort _host; public: OplogReader(); ~OplogReader() { } @@ -61,6 +67,7 @@ namespace repl { void resetConnection() { cursor.reset(); _conn.reset(); + _host = HostAndPort(); } DBClientConnection* conn() { return _conn.get(); } BSONObj findOne(const char *ns, const Query& q) { @@ -76,36 +83,10 @@ namespace repl { /* ok to call if already connected */ bool connect(const HostAndPort& host); - bool connect(const HostAndPort& host, const OID& myRID); - - bool connect(const mongo::OID& rid, const int from, const HostAndPort& to); - void tailCheck(); bool haveCursor() { return cursor.get() != 0; } - /** this is ok but commented out as when used one should consider if QueryOption_OplogReplay - is needed; if not fine, but if so, need to change. - *//* - void query(const char *ns, const BSONObj& query) { - verify( !haveCursor() ); - cursor.reset( _conn->query(ns, query, 0, 0, 0, QueryOption_SlaveOk).release() ); - }*/ - - /** this can be used; it is commented out as it does not indicate - QueryOption_OplogReplay and that is likely important. could be uncommented - just need to add that. - */ - /* - void queryGTE(const char *ns, OpTime t) { - BSONObjBuilder q; - q.appendDate("$gte", t.asDate()); - BSONObjBuilder q2; - q2.append("ts", q.done()); - query(ns, q2.done()); - } - */ - void query(const char *ns, Query query, int nToReturn, @@ -138,11 +119,6 @@ namespace repl { return cursor->getMessage()->size(); } - /* old mongod's can't do the await flag... */ - bool awaitCapable() { - return cursor->hasResultFlag(ResultFlag_AwaitCapable); - } - int getTailingQueryOptions() const { return _tailingQueryOptions; } void setTailingQueryOptions( int tailingQueryOptions ) { _tailingQueryOptions = tailingQueryOptions; } @@ -153,11 +129,8 @@ namespace repl { BSONObj nextSafe() { return cursor->nextSafe(); } BSONObj next() { return cursor->next(); } void putBack(BSONObj op) { cursor->putBack(op); } - - private: - /** @return true iff connection was successful */ - bool commonConnect(const HostAndPort& host); - bool passthroughHandshake(const mongo::OID& rid, const int f); + + HostAndPort getHost() const; }; } // namespace repl diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp index b5a25356041..5b3fe3b8545 100644 --- a/src/mongo/db/repl/repl_set_impl.cpp +++ b/src/mongo/db/repl/repl_set_impl.cpp @@ -489,7 +489,7 @@ namespace { bool meEnsured = false; while (!inShutdown() && !meEnsured) { try { - theReplSet->syncSourceFeedback.ensureMe(&txn); + syncSourceFeedback.ensureMe(&txn); meEnsured = true; } catch (const DBException& e) { diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index db7055d5598..f55660af233 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -41,6 +41,7 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/bgsync.h" +#include "mongo/db/repl/member.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/operation_context.h" diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index d96fe42ea00..d5272b7643d 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -41,6 +41,7 @@ #include "mongo/db/prefetch.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/rs.h" #include "mongo/db/repl/rslog.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/db/operation_context_impl.h" diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h index 7142f527895..a087635d6fa 100644 --- a/src/mongo/util/queue.h +++ b/src/mongo/util/queue.h @@ -29,13 +29,11 @@ #pragma once -#include "mongo/pch.h" - +#include <boost/thread/condition.hpp> #include <limits> #include <queue> -#include <boost/thread/condition.hpp> - +#include "mongo/util/concurrency/mutex.h" #include "mongo/util/timer.h" namespace mongo { |