diff options
Diffstat (limited to 'src/mongo/db/repl/sync_tail.h')
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 286 |
1 files changed, 144 insertions, 142 deletions
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 74f17cdea12..2e4424b71cf 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -38,160 +38,162 @@ namespace mongo { - class Database; - class OperationContext; +class Database; +class OperationContext; namespace repl { - class BackgroundSyncInterface; - class ReplicationCoordinator; - class OpTime; +class BackgroundSyncInterface; +class ReplicationCoordinator; +class OpTime; + +/** + * "Normal" replica set syncing + */ +class SyncTail { +public: + using MultiSyncApplyFunc = stdx::function<void(const std::vector<BSONObj>& ops, SyncTail* st)>; + + /** + * Type of function that takes a non-command op and applies it locally. + * Used for applying from an oplog. + * Last boolean argument 'convertUpdateToUpsert' converts some updates to upserts for + * idempotency reasons. + * Returns failure status if the op was an update that could not be applied. + */ + using ApplyOperationInLockFn = + stdx::function<Status(OperationContext*, Database*, const BSONObj&, bool)>; + + /** + * Type of function that takes a command op and applies it locally. + * Used for applying from an oplog. + * Returns failure status if the op that could not be applied. + */ + using ApplyCommandInLockFn = stdx::function<Status(OperationContext*, const BSONObj&)>; + + /** + * Type of function to increment "repl.apply.ops" server status metric. + */ + using IncrementOpsAppliedStatsFn = stdx::function<void()>; + + SyncTail(BackgroundSyncInterface* q, MultiSyncApplyFunc func); + virtual ~SyncTail(); /** - * "Normal" replica set syncing + * Applies the operation that is in param o. + * Functions for applying operations/commands and increment server status counters may + * be overridden for testing. */ - class SyncTail { + static Status syncApply(OperationContext* txn, + const BSONObj& o, + bool convertUpdateToUpsert, + ApplyOperationInLockFn applyOperationInLock, + ApplyCommandInLockFn applyCommandInLock, + IncrementOpsAppliedStatsFn incrementOpsAppliedStats); + + static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert); + + /** + * Runs _applyOplogUntil(stopOpTime) + */ + virtual void oplogApplication(OperationContext* txn, const OpTime& stopOpTime); + + void oplogApplication(); + bool peek(BSONObj* obj); + + class OpQueue { public: - using MultiSyncApplyFunc = - stdx::function<void (const std::vector<BSONObj>& ops, SyncTail* st)>; - - /** - * Type of function that takes a non-command op and applies it locally. - * Used for applying from an oplog. - * Last boolean argument 'convertUpdateToUpsert' converts some updates to upserts for - * idempotency reasons. - * Returns failure status if the op was an update that could not be applied. - */ - using ApplyOperationInLockFn = - stdx::function<Status (OperationContext*, Database*, const BSONObj&, bool)>; - - /** - * Type of function that takes a command op and applies it locally. - * Used for applying from an oplog. - * Returns failure status if the op that could not be applied. - */ - using ApplyCommandInLockFn = stdx::function<Status (OperationContext*, const BSONObj&)>; - - /** - * Type of function to increment "repl.apply.ops" server status metric. - */ - using IncrementOpsAppliedStatsFn = stdx::function<void ()>; - - SyncTail(BackgroundSyncInterface *q, MultiSyncApplyFunc func); - virtual ~SyncTail(); - - /** - * Applies the operation that is in param o. - * Functions for applying operations/commands and increment server status counters may - * be overridden for testing. - */ - static Status syncApply(OperationContext* txn, - const BSONObj &o, - bool convertUpdateToUpsert, - ApplyOperationInLockFn applyOperationInLock, - ApplyCommandInLockFn applyCommandInLock, - IncrementOpsAppliedStatsFn incrementOpsAppliedStats); - - static Status syncApply(OperationContext* txn, - const BSONObj &o, - bool convertUpdateToUpsert); - - /** - * Runs _applyOplogUntil(stopOpTime) - */ - virtual void oplogApplication(OperationContext* txn, const OpTime& stopOpTime); - - void oplogApplication(); - bool peek(BSONObj* obj); - - class OpQueue { - public: - OpQueue() : _size(0) {} - size_t getSize() const { return _size; } - const std::deque<BSONObj>& getDeque() const { return _deque; } - void push_back(BSONObj& op) { - _deque.push_back(op); - _size += op.objsize(); - } - bool empty() const { - return _deque.empty(); - } - - BSONObj back() const { - invariant(!_deque.empty()); - return _deque.back(); - } - - private: - std::deque<BSONObj> _deque; - size_t _size; - }; - - // returns true if we should continue waiting for BSONObjs, false if we should - // stop waiting and apply the queue we have. Only returns false if !ops.empty(). - bool tryPopAndWaitForMore(OperationContext* txn, - OpQueue* ops, - ReplicationCoordinator* replCoord); - - /** - * Fetch a single document referenced in the operation from the sync source. - */ - virtual BSONObj getMissingDoc(OperationContext* txn, Database* db, const BSONObj& o); - - /** - * If applyOperation_inlock should be called again after an update fails. - */ - virtual bool shouldRetry(OperationContext* txn, const BSONObj& o); - void setHostname(const std::string& hostname); - - protected: - // Cap the batches using the limit on journal commits. - // This works out to be 100 MB (64 bit) or 50 MB (32 bit) - static const unsigned int replBatchLimitBytes = dur::UncommittedBytesLimit; - static const int replBatchLimitSeconds = 1; - static const unsigned int replBatchLimitOperations = 5000; - - // SyncTail base class always supports awaiting commit if any op has j:true flag - // that indicates awaiting commit before updating last OpTime. - virtual bool supportsWaitingUntilDurable() { return true; } - - // Prefetch and write a deque of operations, using the supplied function. - // Initial Sync and Sync Tail each use a different function. - // Returns the last OpTime applied. - static OpTime multiApply(OperationContext* txn, - const OpQueue& ops, - OldThreadPool* prefetcherPool, - OldThreadPool* writerPool, - MultiSyncApplyFunc func, - SyncTail* sync, - bool supportsAwaitingCommit); - - /** - * Applies oplog entries until reaching "endOpTime". - * - * NOTE:Will not transition or check states - */ - void _applyOplogUntil(OperationContext* txn, const OpTime& endOpTime); + OpQueue() : _size(0) {} + size_t getSize() const { + return _size; + } + const std::deque<BSONObj>& getDeque() const { + return _deque; + } + void push_back(BSONObj& op) { + _deque.push_back(op); + _size += op.objsize(); + } + bool empty() const { + return _deque.empty(); + } + + BSONObj back() const { + invariant(!_deque.empty()); + return _deque.back(); + } private: - std::string _hostname; + std::deque<BSONObj> _deque; + size_t _size; + }; - BackgroundSyncInterface* _networkQueue; + // returns true if we should continue waiting for BSONObjs, false if we should + // stop waiting and apply the queue we have. Only returns false if !ops.empty(). + bool tryPopAndWaitForMore(OperationContext* txn, + OpQueue* ops, + ReplicationCoordinator* replCoord); - // Function to use during applyOps - MultiSyncApplyFunc _applyFunc; + /** + * Fetch a single document referenced in the operation from the sync source. + */ + virtual BSONObj getMissingDoc(OperationContext* txn, Database* db, const BSONObj& o); - void handleSlaveDelay(const BSONObj& op); + /** + * If applyOperation_inlock should be called again after an update fails. + */ + virtual bool shouldRetry(OperationContext* txn, const BSONObj& o); + void setHostname(const std::string& hostname); + +protected: + // Cap the batches using the limit on journal commits. + // This works out to be 100 MB (64 bit) or 50 MB (32 bit) + static const unsigned int replBatchLimitBytes = dur::UncommittedBytesLimit; + static const int replBatchLimitSeconds = 1; + static const unsigned int replBatchLimitOperations = 5000; + + // SyncTail base class always supports awaiting commit if any op has j:true flag + // that indicates awaiting commit before updating last OpTime. + virtual bool supportsWaitingUntilDurable() { + return true; + } + + // Prefetch and write a deque of operations, using the supplied function. + // Initial Sync and Sync Tail each use a different function. + // Returns the last OpTime applied. + static OpTime multiApply(OperationContext* txn, + const OpQueue& ops, + OldThreadPool* prefetcherPool, + OldThreadPool* writerPool, + MultiSyncApplyFunc func, + SyncTail* sync, + bool supportsAwaitingCommit); - // persistent pool of worker threads for writing ops to the databases - OldThreadPool _writerPool; - // persistent pool of worker threads for prefetching - OldThreadPool _prefetcherPool; + /** + * Applies oplog entries until reaching "endOpTime". + * + * NOTE:Will not transition or check states + */ + void _applyOplogUntil(OperationContext* txn, const OpTime& endOpTime); - }; +private: + std::string _hostname; + + BackgroundSyncInterface* _networkQueue; + + // Function to use during applyOps + MultiSyncApplyFunc _applyFunc; + + void handleSlaveDelay(const BSONObj& op); + + // persistent pool of worker threads for writing ops to the databases + OldThreadPool _writerPool; + // persistent pool of worker threads for prefetching + OldThreadPool _prefetcherPool; +}; - // These free functions are used by the thread pool workers to write ops to the db. - void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); - void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); +// These free functions are used by the thread pool workers to write ops to the db. +void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); +void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* st); -} // namespace repl -} // namespace mongo +} // namespace repl +} // namespace mongo |