summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2014-09-05 13:50:07 -0400
committerEric Milkie <milkie@10gen.com>2014-09-05 15:39:25 -0400
commit4ce061a127e9f1fb1d31fbaeb64d1179a319316c (patch)
tree52b3edef55e923346c3e54e68b5ad7fbb4197693 /src
parentf7c4d34ddb4a88480a1091d1c39e9db450c2d244 (diff)
downloadmongo-4ce061a127e9f1fb1d31fbaeb64d1179a319316c.tar.gz
SERVER-15089 cleanup oplogreader
1. Move some master/slave oplogreader code into master_slave.cpp 2. fix up includes for some files; remove pch.h from a header (!) 3. remove awaitCapable check 4. add new "_host" field to oplogreader to store the HostAndPort we're connected to; will be used by new Applier
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/bgsync.cpp2
-rw-r--r--src/mongo/db/repl/master_slave.cpp44
-rw-r--r--src/mongo/db/repl/master_slave.h1
-rw-r--r--src/mongo/db/repl/oplogreader.cpp80
-rw-r--r--src/mongo/db/repl/oplogreader.h45
-rw-r--r--src/mongo/db/repl/repl_set_impl.cpp2
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp1
-rw-r--r--src/mongo/db/repl/sync_tail.cpp1
-rw-r--r--src/mongo/util/queue.h6
9 files changed, 59 insertions, 123 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 2ed63b2b512..0e3ad68352b 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -213,8 +213,6 @@ namespace repl {
return;
}
- uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() );
-
if (isRollbackRequired(txn, r)) {
stop();
return;
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
index 5135f77509d..044ecf88029 100644
--- a/src/mongo/db/repl/master_slave.cpp
+++ b/src/mongo/db/repl/master_slave.cpp
@@ -337,6 +337,36 @@ namespace repl {
replAllDead = 0;
}
+ bool replHandshake(DBClientConnection *conn, const OID& myRID) {
+ string myname = getHostName();
+
+ BSONObjBuilder cmd;
+ cmd.append("handshake", myRID);
+
+ BSONObj res;
+ bool ok = conn->runCommand( "admin" , cmd.obj() , res );
+ // ignoring for now on purpose for older versions
+ LOG( ok ? 1 : 0 ) << "replHandshake res not: " << ok << " res: " << res << endl;
+ return true;
+ }
+
+ bool ReplSource::_connect(OplogReader* reader, const HostAndPort& host, const OID& myRID) {
+ if (reader->conn()) {
+ return true;
+ }
+
+ if (!reader->connect(host)) {
+ return false;
+ }
+
+ if (!replHandshake(reader->conn(), myRID)) {
+ return false;
+ }
+
+ return true;
+ }
+
+
void ReplSource::forceResync( OperationContext* txn, const char *requester ) {
BSONObj info;
{
@@ -344,8 +374,8 @@ namespace repl {
invariant(txn->lockState()->isW());
Lock::TempRelease tempRelease(txn->lockState());
- if (!oplogReader.connect(HostAndPort(hostName),
- getGlobalReplicationCoordinator()->getMyRID())) {
+ if (!_connect(&oplogReader, HostAndPort(hostName),
+ getGlobalReplicationCoordinator()->getMyRID())) {
msgassertedNoTrace( 14051 , "unable to connect to resync");
}
/* todo use getDatabaseNames() method here */
@@ -829,8 +859,7 @@ namespace repl {
if ( !oplogReader.more() ) {
if ( tailing ) {
LOG(2) << "repl: tailing & no new activity\n";
- if( oplogReader.awaitCapable() )
- okResultCode = 0; // don't sleep
+ okResultCode = 0; // don't sleep
}
else {
@@ -911,7 +940,7 @@ namespace repl {
if ( moreInitialSyncsPending || !oplogReader.more() ) {
Lock::GlobalWrite lk(txn->lockState());
- if (oplogReader.awaitCapable() && tailing) {
+ if (tailing) {
okResultCode = 0; // don't sleep
}
@@ -1021,8 +1050,9 @@ namespace repl {
return -1;
}
- if ( !oplogReader.connect(HostAndPort(hostName),
- getGlobalReplicationCoordinator()->getMyRID()) ) {
+ if ( !_connect(&oplogReader,
+ HostAndPort(hostName),
+ getGlobalReplicationCoordinator()->getMyRID()) ) {
LOG(4) << "repl: can't connect to sync source" << endl;
return -1;
}
diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h
index 24c4e30c968..6aa52339507 100644
--- a/src/mongo/db/repl/master_slave.h
+++ b/src/mongo/db/repl/master_slave.h
@@ -122,6 +122,7 @@ namespace repl {
void forceResync(OperationContext* txn, const char *requester);
+ bool _connect(OplogReader* reader, const HostAndPort& host, const OID& myRID);
public:
OplogReader oplogReader;
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index acb2eb71d7d..23997eabeb3 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -71,23 +71,6 @@ namespace repl {
return authenticateInternalUser(conn);
}
- bool replHandshake(DBClientConnection *conn, const OID& myRID) {
- string myname = getHostName();
-
- BSONObjBuilder cmd;
- cmd.append("handshake", myRID);
- if (theReplSet) {
- cmd.append("member", theReplSet->selfId());
- cmd.append("config", theReplSet->myConfig().asBson());
- }
-
- BSONObj res;
- bool ok = conn->runCommand( "admin" , cmd.obj() , res );
- // ignoring for now on purpose for older versions
- LOG( ok ? 1 : 0 ) << "replHandshake res not: " << ok << " res: " << res << endl;
- return true;
- }
-
OplogReader::OplogReader() {
_tailingQueryOptions = QueryOption_SlaveOk;
_tailingQueryOptions |= QueryOption_CursorTailable | QueryOption_OplogReplay;
@@ -98,8 +81,8 @@ namespace repl {
readersCreatedStats.increment();
}
- bool OplogReader::commonConnect(const HostAndPort& host) {
- if( conn() == 0 ) {
+ bool OplogReader::connect(const HostAndPort& host) {
+ if (conn() == NULL) {
_conn = shared_ptr<DBClientConnection>(new DBClientConnection(false,
0,
tcp_timeout));
@@ -112,49 +95,11 @@ namespace repl {
log() << "repl: " << errmsg << endl;
return false;
}
+ _host = host;
}
return true;
}
- bool OplogReader::connect(const HostAndPort& host) {
- if (conn()) {
- return true;
- }
-
- if (!commonConnect(host)) {
- return false;
- }
-
- return true;
- }
-
- bool OplogReader::connect(const HostAndPort& host, const OID& myRID) {
- if (conn()) {
- return true;
- }
-
- if (!commonConnect(host)) {
- return false;
- }
-
- if (!replHandshake(_conn.get(), myRID)) {
- return false;
- }
-
- return true;
- }
-
- bool OplogReader::connect(const mongo::OID& rid, const int from, const HostAndPort& to) {
- if (conn() != 0) {
- return true;
- }
- if (commonConnect(to)) {
- log() << "handshake between " << from << " and " << to.toString() << endl;
- return passthroughHandshake(rid, from);
- }
- return false;
- }
-
void OplogReader::tailCheck() {
if( cursor.get() && cursor->isDead() ) {
log() << "repl: old cursor isDead, will initiate a new one" << std::endl;
@@ -162,21 +107,6 @@ namespace repl {
}
}
- bool OplogReader::passthroughHandshake(const mongo::OID& rid, const int nextOnChainId) {
- BSONObjBuilder cmd;
- cmd.append("handshake", rid);
- if (theReplSet) {
- const Member* chainedMember = theReplSet->findById(nextOnChainId);
- if (chainedMember != NULL) {
- cmd.append("config", chainedMember->config().asBson());
- }
- }
- cmd.append("member", nextOnChainId);
-
- BSONObj res;
- return conn()->runCommand("admin", cmd.obj(), res);
- }
-
void OplogReader::query(const char *ns,
Query query,
int nToReturn,
@@ -201,5 +131,9 @@ namespace repl {
tailingQuery(ns, query.done(), fields);
}
+ HostAndPort OplogReader::getHost() const {
+ return _host;
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 0ce08a19d62..884276035ee 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -33,6 +33,7 @@
#include "mongo/client/constants.h"
#include "mongo/client/dbclientcursor.h"
+#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -51,9 +52,14 @@ namespace repl {
*/
class OplogReader {
+ private:
shared_ptr<DBClientConnection> _conn;
shared_ptr<DBClientCursor> cursor;
int _tailingQueryOptions;
+
+ // If _conn was actively connected, _host represents the current HostAndPort of the
+ // connection.
+ HostAndPort _host;
public:
OplogReader();
~OplogReader() { }
@@ -61,6 +67,7 @@ namespace repl {
void resetConnection() {
cursor.reset();
_conn.reset();
+ _host = HostAndPort();
}
DBClientConnection* conn() { return _conn.get(); }
BSONObj findOne(const char *ns, const Query& q) {
@@ -76,36 +83,10 @@ namespace repl {
/* ok to call if already connected */
bool connect(const HostAndPort& host);
- bool connect(const HostAndPort& host, const OID& myRID);
-
- bool connect(const mongo::OID& rid, const int from, const HostAndPort& to);
-
void tailCheck();
bool haveCursor() { return cursor.get() != 0; }
- /** this is ok but commented out as when used one should consider if QueryOption_OplogReplay
- is needed; if not fine, but if so, need to change.
- *//*
- void query(const char *ns, const BSONObj& query) {
- verify( !haveCursor() );
- cursor.reset( _conn->query(ns, query, 0, 0, 0, QueryOption_SlaveOk).release() );
- }*/
-
- /** this can be used; it is commented out as it does not indicate
- QueryOption_OplogReplay and that is likely important. could be uncommented
- just need to add that.
- */
- /*
- void queryGTE(const char *ns, OpTime t) {
- BSONObjBuilder q;
- q.appendDate("$gte", t.asDate());
- BSONObjBuilder q2;
- q2.append("ts", q.done());
- query(ns, q2.done());
- }
- */
-
void query(const char *ns,
Query query,
int nToReturn,
@@ -138,11 +119,6 @@ namespace repl {
return cursor->getMessage()->size();
}
- /* old mongod's can't do the await flag... */
- bool awaitCapable() {
- return cursor->hasResultFlag(ResultFlag_AwaitCapable);
- }
-
int getTailingQueryOptions() const { return _tailingQueryOptions; }
void setTailingQueryOptions( int tailingQueryOptions ) { _tailingQueryOptions = tailingQueryOptions; }
@@ -153,11 +129,8 @@ namespace repl {
BSONObj nextSafe() { return cursor->nextSafe(); }
BSONObj next() { return cursor->next(); }
void putBack(BSONObj op) { cursor->putBack(op); }
-
- private:
- /** @return true iff connection was successful */
- bool commonConnect(const HostAndPort& host);
- bool passthroughHandshake(const mongo::OID& rid, const int f);
+
+ HostAndPort getHost() const;
};
} // namespace repl
diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp
index b5a25356041..5b3fe3b8545 100644
--- a/src/mongo/db/repl/repl_set_impl.cpp
+++ b/src/mongo/db/repl/repl_set_impl.cpp
@@ -489,7 +489,7 @@ namespace {
bool meEnsured = false;
while (!inShutdown() && !meEnsured) {
try {
- theReplSet->syncSourceFeedback.ensureMe(&txn);
+ syncSourceFeedback.ensureMe(&txn);
meEnsured = true;
}
catch (const DBException& e) {
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
index db7055d5598..f55660af233 100644
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/bgsync.h"
+#include "mongo/db/repl/member.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/db/operation_context.h"
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index d96fe42ea00..d5272b7643d 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/prefetch.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/db/operation_context_impl.h"
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h
index 7142f527895..a087635d6fa 100644
--- a/src/mongo/util/queue.h
+++ b/src/mongo/util/queue.h
@@ -29,13 +29,11 @@
#pragma once
-#include "mongo/pch.h"
-
+#include <boost/thread/condition.hpp>
#include <limits>
#include <queue>
-#include <boost/thread/condition.hpp>
-
+#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/timer.h"
namespace mongo {