summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/data_replicator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/data_replicator.cpp')
-rw-r--r--src/mongo/db/repl/data_replicator.cpp80
1 files changed, 2 insertions, 78 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 8e71942ec54..d0caa1eebd6 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -37,7 +37,9 @@
#include <thread>
#include "mongo/base/status.h"
+#include "mongo/client/query_fetcher.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner.h"
@@ -92,43 +94,6 @@ namespace {
} // namespace
/**
- * Follows the fetcher pattern for a find+getmore
- */
- class QueryFetcher {
- MONGO_DISALLOW_COPYING(QueryFetcher);
- public:
- using CallbackFn = stdx::function<void (const BatchDataStatus&, NextAction*)>;
-
- QueryFetcher(ReplicationExecutor* exec,
- const HostAndPort& source,
- const NamespaceString& nss,
- const BSONObj& cmdBSON,
- const QueryFetcher::CallbackFn& onBatchAvailable);
- virtual ~QueryFetcher() = default;
-
- bool isActive() const { return _fetcher.isActive(); }
- Status schedule() { return _fetcher.schedule(); }
- void cancel() { return _fetcher.cancel(); }
- void wait() { if (_fetcher.isActive()) _fetcher.wait(); }
- std::string toString() const;
-
- protected:
- void _onFetchCallback(const BatchDataStatus& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob);
-
- virtual void _delegateCallback(const BatchDataStatus& fetchResult,
- NextAction* nextAction) {
- _work(fetchResult, nextAction);
- };
-
- ReplicationExecutor* _exec;
- Fetcher _fetcher;
- int _responses;
- const QueryFetcher::CallbackFn _work;
- };
-
- /**
* Follows the fetcher pattern for a find+getmore on an oplog
* Returns additional errors if the start oplog entry cannot be found.
*/
@@ -156,47 +121,6 @@ namespace {
const Timestamp _startTS;
};
- // QueryFetcher
- QueryFetcher::QueryFetcher(ReplicationExecutor* exec,
- const HostAndPort& src,
- const NamespaceString& nss,
- const BSONObj& cmdBSON,
- const CallbackFn& work)
- : _exec(exec),
- _fetcher(exec,
- src,
- nss.db().toString(),
- cmdBSON,
- stdx::bind(&QueryFetcher::_onFetchCallback,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- stdx::placeholders::_3)),
- _responses(0),
- _work(work) {
- }
-
- void QueryFetcher::_onFetchCallback(const BatchDataStatus& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
- ++_responses;
-
- _delegateCallback(fetchResult, nextAction);
- // The fetcher will continue to call with kGetMore until an error or the last batch.
- if (fetchResult.isOK() && *nextAction == NextAction::kGetMore) {
- const auto batchData(fetchResult.getValue());
- invariant(getMoreBob);
- getMoreBob->append("getMore", batchData.cursorId);
- getMoreBob->append("collection", batchData.nss.coll());
- }
- }
-
- std::string QueryFetcher::toString() const {
- return str::stream() << "QueryFetcher -"
- << " responses: " << _responses
- << " fetcher: " << _fetcher.getDiagnosticString();
- }
-
// OplogFetcher
OplogFetcher::OplogFetcher(ReplicationExecutor* exec,
const Timestamp& startTS,