diff options
Diffstat (limited to 'src/mongo/db/repl/oplogreader.cpp')
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 264 |
1 files changed, 127 insertions, 137 deletions
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index d982eae975e..012d7d2458c 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -52,162 +52,152 @@ namespace mongo { - using std::shared_ptr; - using std::endl; - using std::string; +using std::shared_ptr; +using std::endl; +using std::string; namespace repl { - const BSONObj reverseNaturalObj = BSON( "$natural" << -1 ); +const BSONObj reverseNaturalObj = BSON("$natural" << -1); - //number of readers created; - // this happens when the source source changes, a reconfig/network-error or the cursor dies - static Counter64 readersCreatedStats; - static ServerStatusMetricField<Counter64> displayReadersCreated( - "repl.network.readersCreated", - &readersCreatedStats ); +// number of readers created; +// this happens when the source source changes, a reconfig/network-error or the cursor dies +static Counter64 readersCreatedStats; +static ServerStatusMetricField<Counter64> displayReadersCreated("repl.network.readersCreated", + &readersCreatedStats); - bool replAuthenticate(DBClientBase *conn) { - if (!getGlobalAuthorizationManager()->isAuthEnabled()) - return true; +bool replAuthenticate(DBClientBase* conn) { + if (!getGlobalAuthorizationManager()->isAuthEnabled()) + return true; - if (!isInternalAuthSet()) - return false; - return authenticateInternalUser(conn); - } + if (!isInternalAuthSet()) + return false; + return authenticateInternalUser(conn); +} - OplogReader::OplogReader() { - _tailingQueryOptions = QueryOption_SlaveOk; - _tailingQueryOptions |= QueryOption_CursorTailable | QueryOption_OplogReplay; - - /* TODO: slaveOk maybe shouldn't use? */ - _tailingQueryOptions |= QueryOption_AwaitData; +OplogReader::OplogReader() { + _tailingQueryOptions = QueryOption_SlaveOk; + _tailingQueryOptions |= QueryOption_CursorTailable | QueryOption_OplogReplay; - readersCreatedStats.increment(); - } + /* TODO: slaveOk maybe shouldn't use? */ + _tailingQueryOptions |= QueryOption_AwaitData; - bool OplogReader::connect(const HostAndPort& host) { - if (conn() == NULL || _host != host) { - resetConnection(); - _conn = shared_ptr<DBClientConnection>(new DBClientConnection(false, - tcp_timeout)); - string errmsg; - if ( !_conn->connect(host, errmsg) || - (getGlobalAuthorizationManager()->isAuthEnabled() && - !replAuthenticate(_conn.get())) ) { - - resetConnection(); - error() << errmsg << endl; - return false; - } - _conn->port().tag |= executor::NetworkInterface::kMessagingPortKeepOpen; - _host = host; - } - return true; - } + readersCreatedStats.increment(); +} - void OplogReader::tailCheck() { - if( cursor.get() && cursor->isDead() ) { - log() << "old cursor isDead, will initiate a new one" << std::endl; - resetCursor(); +bool OplogReader::connect(const HostAndPort& host) { + if (conn() == NULL || _host != host) { + resetConnection(); + _conn = shared_ptr<DBClientConnection>(new DBClientConnection(false, tcp_timeout)); + string errmsg; + if (!_conn->connect(host, errmsg) || + (getGlobalAuthorizationManager()->isAuthEnabled() && !replAuthenticate(_conn.get()))) { + resetConnection(); + error() << errmsg << endl; + return false; } + _conn->port().tag |= executor::NetworkInterface::kMessagingPortKeepOpen; + _host = host; } + return true; +} - void OplogReader::query(const char *ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fields) { - cursor.reset( - _conn->query(ns, query, nToReturn, nToSkip, fields, QueryOption_SlaveOk).release() - ); - } - - void OplogReader::tailingQuery(const char *ns, const BSONObj& query) { - verify( !haveCursor() ); - LOG(2) << ns << ".find(" << query.toString() << ')' << endl; - cursor.reset( _conn->query( ns, query, 0, 0, nullptr, _tailingQueryOptions ).release() ); - } - - void OplogReader::tailingQueryGTE(const char *ns, Timestamp optime) { - BSONObjBuilder gte; - gte.append("$gte", optime); - BSONObjBuilder query; - query.append("ts", gte.done()); - tailingQuery(ns, query.done()); - } - - HostAndPort OplogReader::getHost() const { - return _host; +void OplogReader::tailCheck() { + if (cursor.get() && cursor->isDead()) { + log() << "old cursor isDead, will initiate a new one" << std::endl; + resetCursor(); } - - void OplogReader::connectToSyncSource(OperationContext* txn, - const OpTime& lastOpTimeFetched, - ReplicationCoordinator* replCoord) { - const Timestamp sentinelTimestamp(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0); - const OpTime sentinel(sentinelTimestamp, std::numeric_limits<long long>::max()); - OpTime oldestOpTimeSeen = sentinel; - - invariant(conn() == NULL); - - while (true) { - HostAndPort candidate = replCoord->chooseNewSyncSource(); - - if (candidate.empty()) { - if (oldestOpTimeSeen == sentinel) { - // If, in this invocation of connectToSyncSource(), we did not successfully - // connect to any node ahead of us, - // we apparently have no sync sources to connect to. - // This situation is common; e.g. if there are no writes to the primary at - // the moment. - return; - } - - // Connected to at least one member, but in all cases we were too stale to use them - // as a sync source. - error() << "too stale to catch up"; - log() << "our last optime : " << lastOpTimeFetched; - log() << "oldest available is " << oldestOpTimeSeen; - log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"; - setMinValid(txn, oldestOpTimeSeen); - bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING); - if (!worked) { - warning() << "Failed to transition into " - << MemberState(MemberState::RS_RECOVERING) - << ". Current state: " << replCoord->getMemberState(); - } +} + +void OplogReader::query( + const char* ns, Query query, int nToReturn, int nToSkip, const BSONObj* fields) { + cursor.reset( + _conn->query(ns, query, nToReturn, nToSkip, fields, QueryOption_SlaveOk).release()); +} + +void OplogReader::tailingQuery(const char* ns, const BSONObj& query) { + verify(!haveCursor()); + LOG(2) << ns << ".find(" << query.toString() << ')' << endl; + cursor.reset(_conn->query(ns, query, 0, 0, nullptr, _tailingQueryOptions).release()); +} + +void OplogReader::tailingQueryGTE(const char* ns, Timestamp optime) { + BSONObjBuilder gte; + gte.append("$gte", optime); + BSONObjBuilder query; + query.append("ts", gte.done()); + tailingQuery(ns, query.done()); +} + +HostAndPort OplogReader::getHost() const { + return _host; +} + +void OplogReader::connectToSyncSource(OperationContext* txn, + const OpTime& lastOpTimeFetched, + ReplicationCoordinator* replCoord) { + const Timestamp sentinelTimestamp(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0); + const OpTime sentinel(sentinelTimestamp, std::numeric_limits<long long>::max()); + OpTime oldestOpTimeSeen = sentinel; + + invariant(conn() == NULL); + + while (true) { + HostAndPort candidate = replCoord->chooseNewSyncSource(); + + if (candidate.empty()) { + if (oldestOpTimeSeen == sentinel) { + // If, in this invocation of connectToSyncSource(), we did not successfully + // connect to any node ahead of us, + // we apparently have no sync sources to connect to. + // This situation is common; e.g. if there are no writes to the primary at + // the moment. return; } - if (!connect(candidate)) { - LOG(2) << "can't connect to " << candidate.toString() << - " to read operations"; - resetConnection(); - replCoord->blacklistSyncSource(candidate, Date_t::now() + Seconds(10)); - continue; + // Connected to at least one member, but in all cases we were too stale to use them + // as a sync source. + error() << "too stale to catch up"; + log() << "our last optime : " << lastOpTimeFetched; + log() << "oldest available is " << oldestOpTimeSeen; + log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"; + setMinValid(txn, oldestOpTimeSeen); + bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING); + if (!worked) { + warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING) + << ". Current state: " << replCoord->getMemberState(); } - // Read the first (oldest) op and confirm that it's not newer than our last - // fetched op. Otherwise, we have fallen off the back of that source's oplog. - BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query())); - OpTime remoteOldOpTime = extractOpTime(remoteOldestOp); - - if (lastOpTimeFetched < remoteOldOpTime) { - // We're too stale to use this sync source. - resetConnection(); - replCoord->blacklistSyncSource(candidate, Date_t::now() + Minutes(10)); - if (oldestOpTimeSeen > remoteOldOpTime) { - warning() << "we are too stale to use " << candidate.toString() << - " as a sync source"; - oldestOpTimeSeen = remoteOldOpTime; - } - continue; + return; + } + + if (!connect(candidate)) { + LOG(2) << "can't connect to " << candidate.toString() << " to read operations"; + resetConnection(); + replCoord->blacklistSyncSource(candidate, Date_t::now() + Seconds(10)); + continue; + } + // Read the first (oldest) op and confirm that it's not newer than our last + // fetched op. Otherwise, we have fallen off the back of that source's oplog. + BSONObj remoteOldestOp(findOne(rsOplogName.c_str(), Query())); + OpTime remoteOldOpTime = extractOpTime(remoteOldestOp); + + if (lastOpTimeFetched < remoteOldOpTime) { + // We're too stale to use this sync source. + resetConnection(); + replCoord->blacklistSyncSource(candidate, Date_t::now() + Minutes(10)); + if (oldestOpTimeSeen > remoteOldOpTime) { + warning() << "we are too stale to use " << candidate.toString() + << " as a sync source"; + oldestOpTimeSeen = remoteOldOpTime; } + continue; + } - // Got a valid sync source. - return; - } // while (true) - } + // Got a valid sync source. + return; + } // while (true) +} -} // namespace repl -} // namespace mongo +} // namespace repl +} // namespace mongo |