summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplogreader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplogreader.cpp')
-rw-r--r--src/mongo/db/repl/oplogreader.cpp264
1 files changed, 127 insertions, 137 deletions
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index d982eae975e..012d7d2458c 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -52,162 +52,152 @@
namespace mongo {
- using std::shared_ptr;
- using std::endl;
- using std::string;
+using std::shared_ptr;
+using std::endl;
+using std::string;
namespace repl {
- const BSONObj reverseNaturalObj = BSON( "$natural" << -1 );
+const BSONObj reverseNaturalObj = BSON("$natural" << -1);
- //number of readers created;
- // this happens when the source source changes, a reconfig/network-error or the cursor dies
- static Counter64 readersCreatedStats;
- static ServerStatusMetricField<Counter64> displayReadersCreated(
- "repl.network.readersCreated",
- &readersCreatedStats );
+// number of readers created;
+// this happens when the source source changes, a reconfig/network-error or the cursor dies
+static Counter64 readersCreatedStats;
+static ServerStatusMetricField<Counter64> displayReadersCreated("repl.network.readersCreated",
+ &readersCreatedStats);
- bool replAuthenticate(DBClientBase *conn) {
- if (!getGlobalAuthorizationManager()->isAuthEnabled())
- return true;
+bool replAuthenticate(DBClientBase* conn) {
+ if (!getGlobalAuthorizationManager()->isAuthEnabled())
+ return true;
- if (!isInternalAuthSet())
- return false;
- return authenticateInternalUser(conn);
- }
+ if (!isInternalAuthSet())
+ return false;
+ return authenticateInternalUser(conn);
+}
- OplogReader::OplogReader() {
- _tailingQueryOptions = QueryOption_SlaveOk;
- _tailingQueryOptions |= QueryOption_CursorTailable | QueryOption_OplogReplay;
-
- /* TODO: slaveOk maybe shouldn't use? */
- _tailingQueryOptions |= QueryOption_AwaitData;
+OplogReader::OplogReader() {
+ _tailingQueryOptions = QueryOption_SlaveOk;
+ _tailingQueryOptions |= QueryOption_CursorTailable | QueryOption_OplogReplay;
- readersCreatedStats.increment();
- }
+ /* TODO: slaveOk maybe shouldn't use? */
+ _tailingQueryOptions |= QueryOption_AwaitData;
- bool OplogReader::connect(const HostAndPort& host) {
- if (conn() == NULL || _host != host) {
- resetConnection();
- _conn = shared_ptr<DBClientConnection>(new DBClientConnection(false,
- tcp_timeout));
- string errmsg;
- if ( !_conn->connect(host, errmsg) ||
- (getGlobalAuthorizationManager()->isAuthEnabled() &&
- !replAuthenticate(_conn.get())) ) {
-
- resetConnection();
- error() << errmsg << endl;
- return false;
- }
- _conn->port().tag |= executor::NetworkInterface::kMessagingPortKeepOpen;
- _host = host;
- }
- return true;
- }
+ readersCreatedStats.increment();
+}
- void OplogReader::tailCheck() {
- if( cursor.get() && cursor->isDead() ) {
- log() << "old cursor isDead, will initiate a new one" << std::endl;
- resetCursor();
+bool OplogReader::connect(const HostAndPort& host) {
+ if (conn() == NULL || _host != host) {
+ resetConnection();
+ _conn = shared_ptr<DBClientConnection>(new DBClientConnection(false, tcp_timeout));
+ string errmsg;
+ if (!_conn->connect(host, errmsg) ||
+ (getGlobalAuthorizationManager()->isAuthEnabled() && !replAuthenticate(_conn.get()))) {
+ resetConnection();
+ error() << errmsg << endl;
+ return false;
}
+ _conn->port().tag |= executor::NetworkInterface::kMessagingPortKeepOpen;
+ _host = host;
}
+ return true;
+}
- void OplogReader::query(const char *ns,
- Query query,
- int nToReturn,
- int nToSkip,
- const BSONObj* fields) {
- cursor.reset(
- _conn->query(ns, query, nToReturn, nToSkip, fields, QueryOption_SlaveOk).release()
- );
- }
-
- void OplogReader::tailingQuery(const char *ns, const BSONObj& query) {
- verify( !haveCursor() );
- LOG(2) << ns << ".find(" << query.toString() << ')' << endl;
- cursor.reset( _conn->query( ns, query, 0, 0, nullptr, _tailingQueryOptions ).release() );
- }
-
- void OplogReader::tailingQueryGTE(const char *ns, Timestamp optime) {
- BSONObjBuilder gte;
- gte.append("$gte", optime);
- BSONObjBuilder query;
- query.append("ts", gte.done());
- tailingQuery(ns, query.done());
- }
-
- HostAndPort OplogReader::getHost() const {
- return _host;
+void OplogReader::tailCheck() {
+ if (cursor.get() && cursor->isDead()) {
+ log() << "old cursor isDead, will initiate a new one" << std::endl;
+ resetCursor();
}
-
- void OplogReader::connectToSyncSource(OperationContext* txn,
- const OpTime& lastOpTimeFetched,
- ReplicationCoordinator* replCoord) {
- const Timestamp sentinelTimestamp(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0);
- const OpTime sentinel(sentinelTimestamp, std::numeric_limits<long long>::max());
- OpTime oldestOpTimeSeen = sentinel;
-
- invariant(conn() == NULL);
-
- while (true) {
- HostAndPort candidate = replCoord->chooseNewSyncSource();
-
- if (candidate.empty()) {
- if (oldestOpTimeSeen == sentinel) {
- // If, in this invocation of connectToSyncSource(), we did not successfully
- // connect to any node ahead of us,
- // we apparently have no sync sources to connect to.
- // This situation is common; e.g. if there are no writes to the primary at
- // the moment.
- return;
- }
-
- // Connected to at least one member, but in all cases we were too stale to use them
- // as a sync source.
- error() << "too stale to catch up";
- log() << "our last optime : " << lastOpTimeFetched;
- log() << "oldest available is " << oldestOpTimeSeen;
- log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
- setMinValid(txn, oldestOpTimeSeen);
- bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING);
- if (!worked) {
- warning() << "Failed to transition into "
- << MemberState(MemberState::RS_RECOVERING)
- << ". Current state: " << replCoord->getMemberState();
- }
+}
+
+void OplogReader::query(
+ const char* ns, Query query, int nToReturn, int nToSkip, const BSONObj* fields) {
+ cursor.reset(
+ _conn->query(ns, query, nToReturn, nToSkip, fields, QueryOption_SlaveOk).release());
+}
+
+void OplogReader::tailingQuery(const char* ns, const BSONObj& query) {
+ verify(!haveCursor());
+ LOG(2) << ns << ".find(" << query.toString() << ')' << endl;
+ cursor.reset(_conn->query(ns, query, 0, 0, nullptr, _tailingQueryOptions).release());
+}
+
+void OplogReader::tailingQueryGTE(const char* ns, Timestamp optime) {
+ BSONObjBuilder gte;
+ gte.append("$gte", optime);
+ BSONObjBuilder query;
+ query.append("ts", gte.done());
+ tailingQuery(ns, query.done());
+}
+
+HostAndPort OplogReader::getHost() const {
+ return _host;
+}
+
+void OplogReader::connectToSyncSource(OperationContext* txn,
+ const OpTime& lastOpTimeFetched,
+ ReplicationCoordinator* replCoord) {
+ const Timestamp sentinelTimestamp(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0);
+ const OpTime sentinel(sentinelTimestamp, std::numeric_limits<long long>::max());
+ OpTime oldestOpTimeSeen = sentinel;
+
+ invariant(conn() == NULL);
+
+ while (true) {
+ HostAndPort candidate = replCoord->chooseNewSyncSource();
+
+ if (candidate.empty()) {
+ if (oldestOpTimeSeen == sentinel) {
+ // If, in this invocation of connectToSyncSource(), we did not successfully
+ // connect to any node ahead of us,
+ // we apparently have no sync sources to connect to.
+ // This situation is common; e.g. if there are no writes to the primary at
+ // the moment.
return;
}
- if (!connect(candidate)) {
- LOG(2) << "can't connect to " << candidate.toString() <<
- " to read operations";
- resetConnection();
- replCoord->blacklistSyncSource(candidate, Date_t::now() + Seconds(10));
- continue;
+ // Connected to at least one member, but in all cases we were too stale to use them
+ // as a sync source.
+ error() << "too stale to catch up";
+ log() << "our last optime : " << lastOpTimeFetched;
+ log() << "oldest available is " << oldestOpTimeSeen;
+ log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
+ setMinValid(txn, oldestOpTimeSeen);
+ bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING);
+ if (!worked) {
+ warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING)
+ << ". Current state: " << replCoord->getMemberState();
}
- // Read the first (oldest) op and confirm that it's not newer than our last
- // fetched op. Otherwise, we have fallen off the back of that source's oplog.
- BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query()));
- OpTime remoteOldOpTime = extractOpTime(remoteOldestOp);
-
- if (lastOpTimeFetched < remoteOldOpTime) {
- // We're too stale to use this sync source.
- resetConnection();
- replCoord->blacklistSyncSource(candidate, Date_t::now() + Minutes(10));
- if (oldestOpTimeSeen > remoteOldOpTime) {
- warning() << "we are too stale to use " << candidate.toString() <<
- " as a sync source";
- oldestOpTimeSeen = remoteOldOpTime;
- }
- continue;
+ return;
+ }
+
+ if (!connect(candidate)) {
+ LOG(2) << "can't connect to " << candidate.toString() << " to read operations";
+ resetConnection();
+ replCoord->blacklistSyncSource(candidate, Date_t::now() + Seconds(10));
+ continue;
+ }
+ // Read the first (oldest) op and confirm that it's not newer than our last
+ // fetched op. Otherwise, we have fallen off the back of that source's oplog.
+ BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query()));
+ OpTime remoteOldOpTime = extractOpTime(remoteOldestOp);
+
+ if (lastOpTimeFetched < remoteOldOpTime) {
+ // We're too stale to use this sync source.
+ resetConnection();
+ replCoord->blacklistSyncSource(candidate, Date_t::now() + Minutes(10));
+ if (oldestOpTimeSeen > remoteOldOpTime) {
+ warning() << "we are too stale to use " << candidate.toString()
+ << " as a sync source";
+ oldestOpTimeSeen = remoteOldOpTime;
}
+ continue;
+ }
- // Got a valid sync source.
- return;
- } // while (true)
- }
+ // Got a valid sync source.
+ return;
+ } // while (true)
+}
-} // namespace repl
-} // namespace mongo
+} // namespace repl
+} // namespace mongo