/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include #include #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/client/fetcher.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/base_cloner.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/executor/task_executor.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/net/hostandport.h" namespace mongo { namespace repl { class StorageInterface; class DatabaseCloner : public BaseCloner { MONGO_DISALLOW_COPYING(DatabaseCloner); public: struct Stats { std::string dbname; Date_t start; Date_t end; size_t collections{0}; size_t clonedCollections{0}; std::vector collectionStats; std::string toString() const; BSONObj toBSON() const; void append(BSONObjBuilder* builder) const; }; /** * Predicate used on the collection info objects returned by listCollections. * Each collection info is represented by a document in the following format: * { * name: , * options: * } * * Returns true if the collection described by the info object should be cloned. * Returns false if the collection should be ignored. */ using ListCollectionsPredicateFn = stdx::function; /** * Callback function to report progress of collection cloning. Arguments are: * - status from the collection cloner's 'onCompletion' callback. * - source namespace of the collection cloner that completed (or failed). * * Called exactly once for every collection cloner started by the the database cloner. */ using CollectionCallbackFn = stdx::function; /** * Type of function to start a collection cloner. */ using StartCollectionClonerFn = stdx::function; /** * Creates DatabaseCloner task in inactive state. Use start() to activate cloner. * * The cloner calls 'onCompletion' when the database cloning has completed or failed. * * 'onCompletion' will be called exactly once. * * Takes ownership of the passed StorageInterface object. * * 'listCollectionsFilter' will be extended to include collections only, filtering out views. */ DatabaseCloner(executor::TaskExecutor* executor, ThreadPool* dbWorkThreadPool, const HostAndPort& source, const std::string& dbname, const BSONObj& listCollectionsFilter, const ListCollectionsPredicateFn& listCollectionsPredicate, StorageInterface* storageInterface, const CollectionCallbackFn& collectionWork, const CallbackFn& onCompletion); virtual ~DatabaseCloner(); /** * Returns collection info objects read from listCollections result and will not include views. */ const std::vector& getCollectionInfos_forTest() const; bool isActive() const override; Status startup() noexcept override; void shutdown() override; void join() override; DatabaseCloner::Stats getStats() const; std::string getDBName() const; // // Testing only functions below. // /** * Overrides how executor schedules database work. * * For testing only. */ void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn); /** * Overrides how executor starts a collection cloner. * * For testing only */ void setStartCollectionClonerFn(const StartCollectionClonerFn& startCollectionCloner); // State transitions: // PreStart --> Running --> ShuttingDown --> Complete // It is possible to skip intermediate states. For example, // Calling shutdown() when the cloner has not started will transition from PreStart directly // to Complete. // This enum class is made public for testing. enum class State { kPreStart, kRunning, kShuttingDown, kComplete }; /** * Returns current database cloner state. * For testing only. */ State getState_forTest() const; private: bool _isActive_inlock() const; /** * Returns whether the DatabaseCloner is in shutdown. */ bool _isShuttingDown() const; /** * Read collection names and options from listCollections result. */ void _listCollectionsCallback(const StatusWith& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob); /** * Forwards collection cloner result to client. * Starts a new cloner on a different collection. */ void _collectionClonerCallback(const Status& status, const NamespaceString& nss); /** * Reports completion status. * Sets cloner to inactive. */ void _finishCallback(const Status& status); /** * Calls the above method after unlocking. */ void _finishCallback_inlock(stdx::unique_lock& lk, const Status& status); // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. // // (R) Read-only in concurrent operation; no synchronization required. // (M) Reads and writes guarded by _mutex // (S) Self-synchronizing; access in any way from any context. // (RT) Read-only in concurrent operation; synchronized externally by tests // mutable stdx::mutex _mutex; mutable stdx::condition_variable _condition; // (M) executor::TaskExecutor* _executor; // (R) ThreadPool* _dbWorkThreadPool; // (R) const HostAndPort _source; // (R) const std::string _dbname; // (R) const BSONObj _listCollectionsFilter; // (R) const ListCollectionsPredicateFn _listCollectionsPredicate; // (R) StorageInterface* _storageInterface; // (R) CollectionCallbackFn _collectionWork; // (R) Invoked once for every successfully started collection cloner. CallbackFn _onCompletion; // (R) Invoked once when cloning completes or fails. Fetcher _listCollectionsFetcher; // (R) Fetcher instance for running listCollections command. // Collection info objects returned from listCollections. // Format of each document: // { // name: , // options: // } // Holds all collection infos from listCollections. std::vector _collectionInfos; // (M) std::vector _collectionNamespaces; // (M) std::list _collectionCloners; // (M) std::list::iterator _currentCollectionClonerIter; // (M) std::vector> _failedNamespaces; // (M) CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor. StartCollectionClonerFn _startCollectionCloner; // (RT) Stats _stats; // (M) Stats about what this instance did. // Current database cloner state. See comments for State enum class for details. State _state = State::kPreStart; // (M) }; /** * Insertion operator for DatabaseCloner::State. Formats database cloner state for output stream. * For testing only. */ std::ostream& operator<<(std::ostream& os, const DatabaseCloner::State& state); } // namespace repl } // namespace mongo