diff options
Diffstat (limited to 'src/mongo/db/repl/oplogreader.h')
-rw-r--r-- | src/mongo/db/repl/oplogreader.h | 223 |
1 files changed, 116 insertions, 107 deletions
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 63dcaaeaa20..718fa162d88 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -38,117 +38,126 @@ namespace mongo { - class OperationContext; +class OperationContext; namespace repl { - class ReplicationCoordinator; - class OpTime; +class ReplicationCoordinator; +class OpTime; - // {"$natural": -1 } - extern const BSONObj reverseNaturalObj; +// {"$natural": -1 } +extern const BSONObj reverseNaturalObj; + +/** + * Authenticates conn using the server's cluster-membership credentials. + * + * Returns true on successful authentication. + */ +bool replAuthenticate(DBClientBase* conn); + +/* started abstracting out the querying of the primary/master's oplog + still fairly awkward but a start. +*/ + +class OplogReader { +private: + std::shared_ptr<DBClientConnection> _conn; + std::shared_ptr<DBClientCursor> cursor; + int _tailingQueryOptions; + + // If _conn was actively connected, _host represents the current HostAndPort of the + // connection. + HostAndPort _host; + +public: + OplogReader(); + ~OplogReader() {} + void resetCursor() { + cursor.reset(); + } + void resetConnection() { + cursor.reset(); + _conn.reset(); + _host = HostAndPort(); + } + DBClientConnection* conn() { + return _conn.get(); + } + BSONObj findOne(const char* ns, const Query& q) { + return conn()->findOne(ns, q, 0, QueryOption_SlaveOk); + } + BSONObj getLastOp(const std::string& ns) { + return findOne(ns.c_str(), Query().sort(reverseNaturalObj)); + } + + /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */ + static const int tcp_timeout = 30; + + /* ok to call if already connected */ + bool connect(const HostAndPort& host); + + void tailCheck(); + + bool haveCursor() { + return cursor.get() != 0; + } + + void query(const char* ns, Query query, int nToReturn, int nToSkip, const BSONObj* fields = 0); + + void tailingQuery(const char* ns, const BSONObj& query); + + void tailingQueryGTE(const char* ns, Timestamp t); + + bool more() { + uassert(15910, "Doesn't have cursor for reading oplog", cursor.get()); + return cursor->more(); + } + + bool moreInCurrentBatch() { + uassert(15911, "Doesn't have cursor for reading oplog", cursor.get()); + return cursor->moreInCurrentBatch(); + } + + int currentBatchMessageSize() { + if (NULL == cursor->getMessage()) + return 0; + return cursor->getMessage()->size(); + } + + BSONObj nextSafe() { + return cursor->nextSafe(); + } + BSONObj next() { + return cursor->next(); + } + + + // master/slave only + void peek(std::vector<BSONObj>& v, int n) { + if (cursor.get()) + cursor->peek(v, n); + } + + // master/slave only + void putBack(BSONObj op) { + cursor->putBack(op); + } + + HostAndPort getHost() const; /** - * Authenticates conn using the server's cluster-membership credentials. - * - * Returns true on successful authentication. + * Connects this OplogReader to a valid sync source, using the provided lastOpTimeFetched + * and ReplicationCoordinator objects. + * If this function fails to connect to a sync source that is viable, this OplogReader + * is left unconnected, where this->conn() equals NULL. + * In the process of connecting, this function may add items to the repl coordinator's + * sync source blacklist. + * This function may throw DB exceptions. */ - bool replAuthenticate(DBClientBase* conn); - - /* started abstracting out the querying of the primary/master's oplog - still fairly awkward but a start. - */ - - class OplogReader { - private: - std::shared_ptr<DBClientConnection> _conn; - std::shared_ptr<DBClientCursor> cursor; - int _tailingQueryOptions; - - // If _conn was actively connected, _host represents the current HostAndPort of the - // connection. - HostAndPort _host; - public: - OplogReader(); - ~OplogReader() { } - void resetCursor() { cursor.reset(); } - void resetConnection() { - cursor.reset(); - _conn.reset(); - _host = HostAndPort(); - } - DBClientConnection* conn() { return _conn.get(); } - BSONObj findOne(const char *ns, const Query& q) { - return conn()->findOne(ns, q, 0, QueryOption_SlaveOk); - } - BSONObj getLastOp(const std::string& ns) { - return findOne(ns.c_str(), Query().sort(reverseNaturalObj)); - } - - /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */ - static const int tcp_timeout = 30; - - /* ok to call if already connected */ - bool connect(const HostAndPort& host); - - void tailCheck(); - - bool haveCursor() { return cursor.get() != 0; } - - void query(const char *ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fields=0); - - void tailingQuery(const char *ns, const BSONObj& query); - - void tailingQueryGTE(const char *ns, Timestamp t); - - bool more() { - uassert( 15910, "Doesn't have cursor for reading oplog", cursor.get() ); - return cursor->more(); - } - - bool moreInCurrentBatch() { - uassert( 15911, "Doesn't have cursor for reading oplog", cursor.get() ); - return cursor->moreInCurrentBatch(); - } - - int currentBatchMessageSize() { - if( NULL == cursor->getMessage() ) - return 0; - return cursor->getMessage()->size(); - } - - BSONObj nextSafe() { return cursor->nextSafe(); } - BSONObj next() { return cursor->next(); } - - - // master/slave only - void peek(std::vector<BSONObj>& v, int n) { - if( cursor.get() ) - cursor->peek(v,n); - } - - // master/slave only - void putBack(BSONObj op) { cursor->putBack(op); } - - HostAndPort getHost() const; - - /** - * Connects this OplogReader to a valid sync source, using the provided lastOpTimeFetched - * and ReplicationCoordinator objects. - * If this function fails to connect to a sync source that is viable, this OplogReader - * is left unconnected, where this->conn() equals NULL. - * In the process of connecting, this function may add items to the repl coordinator's - * sync source blacklist. - * This function may throw DB exceptions. - */ - void connectToSyncSource(OperationContext* txn, - const OpTime& lastOpTimeFetched, - ReplicationCoordinator* replCoord); - }; - -} // namespace repl -} // namespace mongo + void connectToSyncSource(OperationContext* txn, + const OpTime& lastOpTimeFetched, + ReplicationCoordinator* replCoord); +}; + +} // namespace repl +} // namespace mongo |