diff options
Diffstat (limited to 'src/mongo/db/repl/data_replicator.cpp')
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 80 |
1 files changed, 2 insertions, 78 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 8e71942ec54..d0caa1eebd6 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -37,7 +37,9 @@ #include <thread> #include "mongo/base/status.h" +#include "mongo/client/query_fetcher.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" @@ -92,43 +94,6 @@ namespace { } // namespace /** - * Follows the fetcher pattern for a find+getmore - */ - class QueryFetcher { - MONGO_DISALLOW_COPYING(QueryFetcher); - public: - using CallbackFn = stdx::function<void (const BatchDataStatus&, NextAction*)>; - - QueryFetcher(ReplicationExecutor* exec, - const HostAndPort& source, - const NamespaceString& nss, - const BSONObj& cmdBSON, - const QueryFetcher::CallbackFn& onBatchAvailable); - virtual ~QueryFetcher() = default; - - bool isActive() const { return _fetcher.isActive(); } - Status schedule() { return _fetcher.schedule(); } - void cancel() { return _fetcher.cancel(); } - void wait() { if (_fetcher.isActive()) _fetcher.wait(); } - std::string toString() const; - - protected: - void _onFetchCallback(const BatchDataStatus& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob); - - virtual void _delegateCallback(const BatchDataStatus& fetchResult, - NextAction* nextAction) { - _work(fetchResult, nextAction); - }; - - ReplicationExecutor* _exec; - Fetcher _fetcher; - int _responses; - const QueryFetcher::CallbackFn _work; - }; - - /** * Follows the fetcher pattern for a find+getmore on an oplog * Returns additional errors if the start oplog entry cannot be found. */ @@ -156,47 +121,6 @@ namespace { const Timestamp _startTS; }; - // QueryFetcher - QueryFetcher::QueryFetcher(ReplicationExecutor* exec, - const HostAndPort& src, - const NamespaceString& nss, - const BSONObj& cmdBSON, - const CallbackFn& work) - : _exec(exec), - _fetcher(exec, - src, - nss.db().toString(), - cmdBSON, - stdx::bind(&QueryFetcher::_onFetchCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3)), - _responses(0), - _work(work) { - } - - void QueryFetcher::_onFetchCallback(const BatchDataStatus& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - ++_responses; - - _delegateCallback(fetchResult, nextAction); - // The fetcher will continue to call with kGetMore until an error or the last batch. - if (fetchResult.isOK() && *nextAction == NextAction::kGetMore) { - const auto batchData(fetchResult.getValue()); - invariant(getMoreBob); - getMoreBob->append("getMore", batchData.cursorId); - getMoreBob->append("collection", batchData.nss.coll()); - } - } - - std::string QueryFetcher::toString() const { - return str::stream() << "QueryFetcher -" - << " responses: " << _responses - << " fetcher: " << _fetcher.getDiagnosticString(); - } - // OplogFetcher OplogFetcher::OplogFetcher(ReplicationExecutor* exec, const Timestamp& startTS, |