summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_cloner.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.h')
-rw-r--r--src/mongo/db/repl/collection_cloner.h403
1 files changed, 199 insertions, 204 deletions
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 69f3caa1f18..cf69d7f44ef 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -48,217 +48,212 @@
namespace mongo {
namespace repl {
- class CollectionCloner : public BaseCloner {
- MONGO_DISALLOW_COPYING(CollectionCloner);
- public:
-
- /**
- * Storage interface for collection cloner.
- *
- * Supports the operations on the storage layer required by the cloner.
- */
- class StorageInterface;
-
- /**
- * Type of function to schedule database work with the executor.
- *
- * Must be consistent with ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock().
- *
- * Used for testing only.
- */
- using ScheduleDbWorkFn = stdx::function<StatusWith<ReplicationExecutor::CallbackHandle> (
- const ReplicationExecutor::CallbackFn&)>;
-
- /**
- * Creates CollectionCloner task in inactive state. Use start() to activate cloner.
- *
- * The cloner calls 'onCompletion' when the collection cloning has completed or failed.
- *
- * 'onCompletion' will be called exactly once.
- *
- * Takes ownership of the passed StorageInterface object.
- */
- CollectionCloner(ReplicationExecutor* executor,
- const HostAndPort& source,
- const NamespaceString& sourceNss,
- const CollectionOptions& options,
- const CallbackFn& onCompletion,
- StorageInterface* storageInterface);
-
- virtual ~CollectionCloner();
-
- const NamespaceString& getSourceNamespace() const;
-
- std::string getDiagnosticString() const override;
-
- bool isActive() const override;
-
- Status start() override;
-
- void cancel() override;
-
- void wait() override;
-
- //
- // Testing only functions below.
- //
-
- /**
- * Waits for database worker to complete.
- * Returns immediately if collection cloner is not active.
- *
- * For testing only.
- */
- void waitForDbWorker();
-
- /**
- * Overrides how executor schedules database work.
- *
- * For testing only.
- */
- void setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn);
-
- private:
-
- /**
- * Read index specs from listIndexes result.
- */
- void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob);
-
- /**
- * Read collection documents from find result.
- */
- void _findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob);
-
- /**
- * Request storage interface to create collection.
- *
- * Called multiple times if there are more than one batch of responses from listIndexes
- * cursor.
- *
- * 'nextAction' is an in/out arg indicating the next action planned and to be taken
- * by the fetcher.
- */
- void _beginCollectionCallback(const ReplicationExecutor::CallbackArgs& callbackData);
-
- /**
- * Called multiple times if there are more than one batch of documents from the fetcher.
- * On the last batch, 'lastBatch' will be true.
- *
- * Each document returned will be inserted via the storage interfaceRequest storage
- * interface.
- */
- void _insertDocumentsCallback(const ReplicationExecutor::CallbackArgs& callbackData,
- bool lastBatch);
-
- /**
- * Reports completion status.
- * Commits/aborts collection building.
- * Sets cloner to inactive.
- */
- void _finishCallback(OperationContext* txn, const Status& status);
-
- // Not owned by us.
- ReplicationExecutor* _executor;
-
- HostAndPort _source;
- NamespaceString _sourceNss;
- NamespaceString _destNss;
- CollectionOptions _options;
-
- // Invoked once when cloning completes or fails.
- CallbackFn _onCompletion;
-
- // Not owned by us.
- StorageInterface* _storageInterface;
-
- // Protects member data of this collection cloner.
- mutable stdx::mutex _mutex;
+class CollectionCloner : public BaseCloner {
+ MONGO_DISALLOW_COPYING(CollectionCloner);
- mutable stdx::condition_variable _condition;
+public:
+ /**
+ * Storage interface for collection cloner.
+ *
+ * Supports the operations on the storage layer required by the cloner.
+ */
+ class StorageInterface;
+
+ /**
+ * Type of function to schedule database work with the executor.
+ *
+ * Must be consistent with ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock().
+ *
+ * Used for testing only.
+ */
+ using ScheduleDbWorkFn = stdx::function<StatusWith<ReplicationExecutor::CallbackHandle>(
+ const ReplicationExecutor::CallbackFn&)>;
+
+ /**
+ * Creates CollectionCloner task in inactive state. Use start() to activate cloner.
+ *
+ * The cloner calls 'onCompletion' when the collection cloning has completed or failed.
+ *
+ * 'onCompletion' will be called exactly once.
+ *
+ * Takes ownership of the passed StorageInterface object.
+ */
+ CollectionCloner(ReplicationExecutor* executor,
+ const HostAndPort& source,
+ const NamespaceString& sourceNss,
+ const CollectionOptions& options,
+ const CallbackFn& onCompletion,
+ StorageInterface* storageInterface);
+
+ virtual ~CollectionCloner();
+
+ const NamespaceString& getSourceNamespace() const;
+
+ std::string getDiagnosticString() const override;
+
+ bool isActive() const override;
+
+ Status start() override;
- // _active is true when Collection Cloner is started.
- bool _active;
+ void cancel() override;
- // Fetcher instances for running listIndexes and find commands.
- Fetcher _listIndexesFetcher;
- Fetcher _findFetcher;
+ void wait() override;
- std::vector<BSONObj> _indexSpecs;
-
- // Current batch of documents read from fetcher to insert into collection.
- std::vector<BSONObj> _documents;
-
- // Callback handle for database worker.
- ReplicationExecutor::CallbackHandle _dbWorkCallbackHandle;
-
- // Function for scheduling database work using the executor.
- ScheduleDbWorkFn _scheduleDbWorkFn;
-
- };
+ //
+ // Testing only functions below.
+ //
/**
- * Storage interface used by the collection cloner to build a collection.
+ * Waits for database worker to complete.
+ * Returns immediately if collection cloner is not active.
*
- * Operation context is provided by the replication executor via the cloner.
+ * For testing only.
+ */
+ void waitForDbWorker();
+
+ /**
+ * Overrides how executor schedules database work.
*
- * The storage interface is expected to acquire locks on any resources it needs
- * to perform any of its functions.
+ * For testing only.
+ */
+ void setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn);
+
+private:
+ /**
+ * Read index specs from listIndexes result.
+ */
+ void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob);
+
+ /**
+ * Read collection documents from find result.
+ */
+ void _findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob);
+
+ /**
+ * Request storage interface to create collection.
+ *
+ * Called multiple times if there are more than one batch of responses from listIndexes
+ * cursor.
*
- * TODO: Consider having commit/abort/cancel functions.
+ * 'nextAction' is an in/out arg indicating the next action planned and to be taken
+ * by the fetcher.
*/
- class CollectionCloner::StorageInterface {
- public:
-
- virtual ~StorageInterface() = default;
-
- /**
- * Creates a collection with the provided indexes.
- *
- * Assume that no database locks have been acquired prior to calling this
- * function.
- */
- virtual Status beginCollection(OperationContext* txn,
- const NamespaceString& nss,
- const CollectionOptions& options,
- const std::vector<BSONObj>& indexSpecs) = 0;
-
- /**
- * Inserts documents into a collection.
- *
- * Assume that no database locks have been acquired prior to calling this
- * function.
- */
- virtual Status insertDocuments(OperationContext* txn,
- const NamespaceString& nss,
- const std::vector<BSONObj>& documents) = 0;
-
- /**
- * Commits changes to collection. No effect if collection building has not begun.
- * Operation context could be null.
- */
- virtual Status commitCollection(OperationContext* txn,
- const NamespaceString& nss) = 0;
-
- /**
- * Inserts missing document into a collection (not related to insertDocuments above),
- * during initial sync retry logic
- */
- virtual Status insertMissingDoc(OperationContext* txn,
- const NamespaceString& nss,
- const BSONObj& doc) = 0;
-
- /**
- * Inserts missing document into a collection (not related to insertDocuments above),
- * during initial sync retry logic
- */
- virtual Status dropUserDatabases(OperationContext* txn) = 0;
-
- };
-
-} // namespace repl
-} // namespace mongo
+ void _beginCollectionCallback(const ReplicationExecutor::CallbackArgs& callbackData);
+
+ /**
+ * Called multiple times if there are more than one batch of documents from the fetcher.
+ * On the last batch, 'lastBatch' will be true.
+ *
+ * Each document returned will be inserted via the storage interfaceRequest storage
+ * interface.
+ */
+ void _insertDocumentsCallback(const ReplicationExecutor::CallbackArgs& callbackData,
+ bool lastBatch);
+
+ /**
+ * Reports completion status.
+ * Commits/aborts collection building.
+ * Sets cloner to inactive.
+ */
+ void _finishCallback(OperationContext* txn, const Status& status);
+
+ // Not owned by us.
+ ReplicationExecutor* _executor;
+
+ HostAndPort _source;
+ NamespaceString _sourceNss;
+ NamespaceString _destNss;
+ CollectionOptions _options;
+
+ // Invoked once when cloning completes or fails.
+ CallbackFn _onCompletion;
+
+ // Not owned by us.
+ StorageInterface* _storageInterface;
+
+ // Protects member data of this collection cloner.
+ mutable stdx::mutex _mutex;
+
+ mutable stdx::condition_variable _condition;
+
+ // _active is true when Collection Cloner is started.
+ bool _active;
+
+ // Fetcher instances for running listIndexes and find commands.
+ Fetcher _listIndexesFetcher;
+ Fetcher _findFetcher;
+
+ std::vector<BSONObj> _indexSpecs;
+
+ // Current batch of documents read from fetcher to insert into collection.
+ std::vector<BSONObj> _documents;
+
+ // Callback handle for database worker.
+ ReplicationExecutor::CallbackHandle _dbWorkCallbackHandle;
+
+ // Function for scheduling database work using the executor.
+ ScheduleDbWorkFn _scheduleDbWorkFn;
+};
+
+/**
+ * Storage interface used by the collection cloner to build a collection.
+ *
+ * Operation context is provided by the replication executor via the cloner.
+ *
+ * The storage interface is expected to acquire locks on any resources it needs
+ * to perform any of its functions.
+ *
+ * TODO: Consider having commit/abort/cancel functions.
+ */
+class CollectionCloner::StorageInterface {
+public:
+ virtual ~StorageInterface() = default;
+
+ /**
+ * Creates a collection with the provided indexes.
+ *
+ * Assume that no database locks have been acquired prior to calling this
+ * function.
+ */
+ virtual Status beginCollection(OperationContext* txn,
+ const NamespaceString& nss,
+ const CollectionOptions& options,
+ const std::vector<BSONObj>& indexSpecs) = 0;
+
+ /**
+ * Inserts documents into a collection.
+ *
+ * Assume that no database locks have been acquired prior to calling this
+ * function.
+ */
+ virtual Status insertDocuments(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& documents) = 0;
+
+ /**
+ * Commits changes to collection. No effect if collection building has not begun.
+ * Operation context could be null.
+ */
+ virtual Status commitCollection(OperationContext* txn, const NamespaceString& nss) = 0;
+
+ /**
+ * Inserts missing document into a collection (not related to insertDocuments above),
+ * during initial sync retry logic
+ */
+ virtual Status insertMissingDoc(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& doc) = 0;
+
+ /**
+ * Inserts missing document into a collection (not related to insertDocuments above),
+ * during initial sync retry logic
+ */
+ virtual Status dropUserDatabases(OperationContext* txn) = 0;
+};
+
+} // namespace repl
+} // namespace mongo