diff options
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.h')
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 403 |
1 files changed, 199 insertions, 204 deletions
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 69f3caa1f18..cf69d7f44ef 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -48,217 +48,212 @@ namespace mongo { namespace repl { - class CollectionCloner : public BaseCloner { - MONGO_DISALLOW_COPYING(CollectionCloner); - public: - - /** - * Storage interface for collection cloner. - * - * Supports the operations on the storage layer required by the cloner. - */ - class StorageInterface; - - /** - * Type of function to schedule database work with the executor. - * - * Must be consistent with ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock(). - * - * Used for testing only. - */ - using ScheduleDbWorkFn = stdx::function<StatusWith<ReplicationExecutor::CallbackHandle> ( - const ReplicationExecutor::CallbackFn&)>; - - /** - * Creates CollectionCloner task in inactive state. Use start() to activate cloner. - * - * The cloner calls 'onCompletion' when the collection cloning has completed or failed. - * - * 'onCompletion' will be called exactly once. - * - * Takes ownership of the passed StorageInterface object. - */ - CollectionCloner(ReplicationExecutor* executor, - const HostAndPort& source, - const NamespaceString& sourceNss, - const CollectionOptions& options, - const CallbackFn& onCompletion, - StorageInterface* storageInterface); - - virtual ~CollectionCloner(); - - const NamespaceString& getSourceNamespace() const; - - std::string getDiagnosticString() const override; - - bool isActive() const override; - - Status start() override; - - void cancel() override; - - void wait() override; - - // - // Testing only functions below. - // - - /** - * Waits for database worker to complete. - * Returns immediately if collection cloner is not active. - * - * For testing only. - */ - void waitForDbWorker(); - - /** - * Overrides how executor schedules database work. - * - * For testing only. - */ - void setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn); - - private: - - /** - * Read index specs from listIndexes result. - */ - void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob); - - /** - * Read collection documents from find result. - */ - void _findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob); - - /** - * Request storage interface to create collection. - * - * Called multiple times if there are more than one batch of responses from listIndexes - * cursor. - * - * 'nextAction' is an in/out arg indicating the next action planned and to be taken - * by the fetcher. - */ - void _beginCollectionCallback(const ReplicationExecutor::CallbackArgs& callbackData); - - /** - * Called multiple times if there are more than one batch of documents from the fetcher. - * On the last batch, 'lastBatch' will be true. - * - * Each document returned will be inserted via the storage interfaceRequest storage - * interface. - */ - void _insertDocumentsCallback(const ReplicationExecutor::CallbackArgs& callbackData, - bool lastBatch); - - /** - * Reports completion status. - * Commits/aborts collection building. - * Sets cloner to inactive. - */ - void _finishCallback(OperationContext* txn, const Status& status); - - // Not owned by us. - ReplicationExecutor* _executor; - - HostAndPort _source; - NamespaceString _sourceNss; - NamespaceString _destNss; - CollectionOptions _options; - - // Invoked once when cloning completes or fails. - CallbackFn _onCompletion; - - // Not owned by us. - StorageInterface* _storageInterface; - - // Protects member data of this collection cloner. - mutable stdx::mutex _mutex; +class CollectionCloner : public BaseCloner { + MONGO_DISALLOW_COPYING(CollectionCloner); - mutable stdx::condition_variable _condition; +public: + /** + * Storage interface for collection cloner. + * + * Supports the operations on the storage layer required by the cloner. + */ + class StorageInterface; + + /** + * Type of function to schedule database work with the executor. + * + * Must be consistent with ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock(). + * + * Used for testing only. + */ + using ScheduleDbWorkFn = stdx::function<StatusWith<ReplicationExecutor::CallbackHandle>( + const ReplicationExecutor::CallbackFn&)>; + + /** + * Creates CollectionCloner task in inactive state. Use start() to activate cloner. + * + * The cloner calls 'onCompletion' when the collection cloning has completed or failed. + * + * 'onCompletion' will be called exactly once. + * + * Takes ownership of the passed StorageInterface object. + */ + CollectionCloner(ReplicationExecutor* executor, + const HostAndPort& source, + const NamespaceString& sourceNss, + const CollectionOptions& options, + const CallbackFn& onCompletion, + StorageInterface* storageInterface); + + virtual ~CollectionCloner(); + + const NamespaceString& getSourceNamespace() const; + + std::string getDiagnosticString() const override; + + bool isActive() const override; + + Status start() override; - // _active is true when Collection Cloner is started. - bool _active; + void cancel() override; - // Fetcher instances for running listIndexes and find commands. - Fetcher _listIndexesFetcher; - Fetcher _findFetcher; + void wait() override; - std::vector<BSONObj> _indexSpecs; - - // Current batch of documents read from fetcher to insert into collection. - std::vector<BSONObj> _documents; - - // Callback handle for database worker. - ReplicationExecutor::CallbackHandle _dbWorkCallbackHandle; - - // Function for scheduling database work using the executor. - ScheduleDbWorkFn _scheduleDbWorkFn; - - }; + // + // Testing only functions below. + // /** - * Storage interface used by the collection cloner to build a collection. + * Waits for database worker to complete. + * Returns immediately if collection cloner is not active. * - * Operation context is provided by the replication executor via the cloner. + * For testing only. + */ + void waitForDbWorker(); + + /** + * Overrides how executor schedules database work. * - * The storage interface is expected to acquire locks on any resources it needs - * to perform any of its functions. + * For testing only. + */ + void setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn); + +private: + /** + * Read index specs from listIndexes result. + */ + void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob); + + /** + * Read collection documents from find result. + */ + void _findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob); + + /** + * Request storage interface to create collection. + * + * Called multiple times if there are more than one batch of responses from listIndexes + * cursor. * - * TODO: Consider having commit/abort/cancel functions. + * 'nextAction' is an in/out arg indicating the next action planned and to be taken + * by the fetcher. */ - class CollectionCloner::StorageInterface { - public: - - virtual ~StorageInterface() = default; - - /** - * Creates a collection with the provided indexes. - * - * Assume that no database locks have been acquired prior to calling this - * function. - */ - virtual Status beginCollection(OperationContext* txn, - const NamespaceString& nss, - const CollectionOptions& options, - const std::vector<BSONObj>& indexSpecs) = 0; - - /** - * Inserts documents into a collection. - * - * Assume that no database locks have been acquired prior to calling this - * function. - */ - virtual Status insertDocuments(OperationContext* txn, - const NamespaceString& nss, - const std::vector<BSONObj>& documents) = 0; - - /** - * Commits changes to collection. No effect if collection building has not begun. - * Operation context could be null. - */ - virtual Status commitCollection(OperationContext* txn, - const NamespaceString& nss) = 0; - - /** - * Inserts missing document into a collection (not related to insertDocuments above), - * during initial sync retry logic - */ - virtual Status insertMissingDoc(OperationContext* txn, - const NamespaceString& nss, - const BSONObj& doc) = 0; - - /** - * Inserts missing document into a collection (not related to insertDocuments above), - * during initial sync retry logic - */ - virtual Status dropUserDatabases(OperationContext* txn) = 0; - - }; - -} // namespace repl -} // namespace mongo + void _beginCollectionCallback(const ReplicationExecutor::CallbackArgs& callbackData); + + /** + * Called multiple times if there are more than one batch of documents from the fetcher. + * On the last batch, 'lastBatch' will be true. + * + * Each document returned will be inserted via the storage interfaceRequest storage + * interface. + */ + void _insertDocumentsCallback(const ReplicationExecutor::CallbackArgs& callbackData, + bool lastBatch); + + /** + * Reports completion status. + * Commits/aborts collection building. + * Sets cloner to inactive. + */ + void _finishCallback(OperationContext* txn, const Status& status); + + // Not owned by us. + ReplicationExecutor* _executor; + + HostAndPort _source; + NamespaceString _sourceNss; + NamespaceString _destNss; + CollectionOptions _options; + + // Invoked once when cloning completes or fails. + CallbackFn _onCompletion; + + // Not owned by us. + StorageInterface* _storageInterface; + + // Protects member data of this collection cloner. + mutable stdx::mutex _mutex; + + mutable stdx::condition_variable _condition; + + // _active is true when Collection Cloner is started. + bool _active; + + // Fetcher instances for running listIndexes and find commands. + Fetcher _listIndexesFetcher; + Fetcher _findFetcher; + + std::vector<BSONObj> _indexSpecs; + + // Current batch of documents read from fetcher to insert into collection. + std::vector<BSONObj> _documents; + + // Callback handle for database worker. + ReplicationExecutor::CallbackHandle _dbWorkCallbackHandle; + + // Function for scheduling database work using the executor. + ScheduleDbWorkFn _scheduleDbWorkFn; +}; + +/** + * Storage interface used by the collection cloner to build a collection. + * + * Operation context is provided by the replication executor via the cloner. + * + * The storage interface is expected to acquire locks on any resources it needs + * to perform any of its functions. + * + * TODO: Consider having commit/abort/cancel functions. + */ +class CollectionCloner::StorageInterface { +public: + virtual ~StorageInterface() = default; + + /** + * Creates a collection with the provided indexes. + * + * Assume that no database locks have been acquired prior to calling this + * function. + */ + virtual Status beginCollection(OperationContext* txn, + const NamespaceString& nss, + const CollectionOptions& options, + const std::vector<BSONObj>& indexSpecs) = 0; + + /** + * Inserts documents into a collection. + * + * Assume that no database locks have been acquired prior to calling this + * function. + */ + virtual Status insertDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& documents) = 0; + + /** + * Commits changes to collection. No effect if collection building has not begun. + * Operation context could be null. + */ + virtual Status commitCollection(OperationContext* txn, const NamespaceString& nss) = 0; + + /** + * Inserts missing document into a collection (not related to insertDocuments above), + * during initial sync retry logic + */ + virtual Status insertMissingDoc(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& doc) = 0; + + /** + * Inserts missing document into a collection (not related to insertDocuments above), + * during initial sync retry logic + */ + virtual Status dropUserDatabases(OperationContext* txn) = 0; +}; + +} // namespace repl +} // namespace mongo |