summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplogreader.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplogreader.h')
-rw-r--r--src/mongo/db/repl/oplogreader.h223
1 files changed, 116 insertions, 107 deletions
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 63dcaaeaa20..718fa162d88 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -38,117 +38,126 @@
namespace mongo {
- class OperationContext;
+class OperationContext;
namespace repl {
- class ReplicationCoordinator;
- class OpTime;
+class ReplicationCoordinator;
+class OpTime;
- // {"$natural": -1 }
- extern const BSONObj reverseNaturalObj;
+// {"$natural": -1 }
+extern const BSONObj reverseNaturalObj;
+
+/**
+ * Authenticates conn using the server's cluster-membership credentials.
+ *
+ * Returns true on successful authentication.
+ */
+bool replAuthenticate(DBClientBase* conn);
+
+/* started abstracting out the querying of the primary/master's oplog
+ still fairly awkward but a start.
+*/
+
+class OplogReader {
+private:
+ std::shared_ptr<DBClientConnection> _conn;
+ std::shared_ptr<DBClientCursor> cursor;
+ int _tailingQueryOptions;
+
+ // If _conn was actively connected, _host represents the current HostAndPort of the
+ // connection.
+ HostAndPort _host;
+
+public:
+ OplogReader();
+ ~OplogReader() {}
+ void resetCursor() {
+ cursor.reset();
+ }
+ void resetConnection() {
+ cursor.reset();
+ _conn.reset();
+ _host = HostAndPort();
+ }
+ DBClientConnection* conn() {
+ return _conn.get();
+ }
+ BSONObj findOne(const char* ns, const Query& q) {
+ return conn()->findOne(ns, q, 0, QueryOption_SlaveOk);
+ }
+ BSONObj getLastOp(const std::string& ns) {
+ return findOne(ns.c_str(), Query().sort(reverseNaturalObj));
+ }
+
+ /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */
+ static const int tcp_timeout = 30;
+
+ /* ok to call if already connected */
+ bool connect(const HostAndPort& host);
+
+ void tailCheck();
+
+ bool haveCursor() {
+ return cursor.get() != 0;
+ }
+
+ void query(const char* ns, Query query, int nToReturn, int nToSkip, const BSONObj* fields = 0);
+
+ void tailingQuery(const char* ns, const BSONObj& query);
+
+ void tailingQueryGTE(const char* ns, Timestamp t);
+
+ bool more() {
+ uassert(15910, "Doesn't have cursor for reading oplog", cursor.get());
+ return cursor->more();
+ }
+
+ bool moreInCurrentBatch() {
+ uassert(15911, "Doesn't have cursor for reading oplog", cursor.get());
+ return cursor->moreInCurrentBatch();
+ }
+
+ int currentBatchMessageSize() {
+ if (NULL == cursor->getMessage())
+ return 0;
+ return cursor->getMessage()->size();
+ }
+
+ BSONObj nextSafe() {
+ return cursor->nextSafe();
+ }
+ BSONObj next() {
+ return cursor->next();
+ }
+
+
+ // master/slave only
+ void peek(std::vector<BSONObj>& v, int n) {
+ if (cursor.get())
+ cursor->peek(v, n);
+ }
+
+ // master/slave only
+ void putBack(BSONObj op) {
+ cursor->putBack(op);
+ }
+
+ HostAndPort getHost() const;
/**
- * Authenticates conn using the server's cluster-membership credentials.
- *
- * Returns true on successful authentication.
+ * Connects this OplogReader to a valid sync source, using the provided lastOpTimeFetched
+ * and ReplicationCoordinator objects.
+ * If this function fails to connect to a sync source that is viable, this OplogReader
+ * is left unconnected, where this->conn() equals NULL.
+ * In the process of connecting, this function may add items to the repl coordinator's
+ * sync source blacklist.
+ * This function may throw DB exceptions.
*/
- bool replAuthenticate(DBClientBase* conn);
-
- /* started abstracting out the querying of the primary/master's oplog
- still fairly awkward but a start.
- */
-
- class OplogReader {
- private:
- std::shared_ptr<DBClientConnection> _conn;
- std::shared_ptr<DBClientCursor> cursor;
- int _tailingQueryOptions;
-
- // If _conn was actively connected, _host represents the current HostAndPort of the
- // connection.
- HostAndPort _host;
- public:
- OplogReader();
- ~OplogReader() { }
- void resetCursor() { cursor.reset(); }
- void resetConnection() {
- cursor.reset();
- _conn.reset();
- _host = HostAndPort();
- }
- DBClientConnection* conn() { return _conn.get(); }
- BSONObj findOne(const char *ns, const Query& q) {
- return conn()->findOne(ns, q, 0, QueryOption_SlaveOk);
- }
- BSONObj getLastOp(const std::string& ns) {
- return findOne(ns.c_str(), Query().sort(reverseNaturalObj));
- }
-
- /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */
- static const int tcp_timeout = 30;
-
- /* ok to call if already connected */
- bool connect(const HostAndPort& host);
-
- void tailCheck();
-
- bool haveCursor() { return cursor.get() != 0; }
-
- void query(const char *ns,
- Query query,
- int nToReturn,
- int nToSkip,
- const BSONObj* fields=0);
-
- void tailingQuery(const char *ns, const BSONObj& query);
-
- void tailingQueryGTE(const char *ns, Timestamp t);
-
- bool more() {
- uassert( 15910, "Doesn't have cursor for reading oplog", cursor.get() );
- return cursor->more();
- }
-
- bool moreInCurrentBatch() {
- uassert( 15911, "Doesn't have cursor for reading oplog", cursor.get() );
- return cursor->moreInCurrentBatch();
- }
-
- int currentBatchMessageSize() {
- if( NULL == cursor->getMessage() )
- return 0;
- return cursor->getMessage()->size();
- }
-
- BSONObj nextSafe() { return cursor->nextSafe(); }
- BSONObj next() { return cursor->next(); }
-
-
- // master/slave only
- void peek(std::vector<BSONObj>& v, int n) {
- if( cursor.get() )
- cursor->peek(v,n);
- }
-
- // master/slave only
- void putBack(BSONObj op) { cursor->putBack(op); }
-
- HostAndPort getHost() const;
-
- /**
- * Connects this OplogReader to a valid sync source, using the provided lastOpTimeFetched
- * and ReplicationCoordinator objects.
- * If this function fails to connect to a sync source that is viable, this OplogReader
- * is left unconnected, where this->conn() equals NULL.
- * In the process of connecting, this function may add items to the repl coordinator's
- * sync source blacklist.
- * This function may throw DB exceptions.
- */
- void connectToSyncSource(OperationContext* txn,
- const OpTime& lastOpTimeFetched,
- ReplicationCoordinator* replCoord);
- };
-
-} // namespace repl
-} // namespace mongo
+ void connectToSyncSource(OperationContext* txn,
+ const OpTime& lastOpTimeFetched,
+ ReplicationCoordinator* replCoord);
+};
+
+} // namespace repl
+} // namespace mongo