summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-06-19 15:53:06 -0400
committerBenety Goh <benety@mongodb.com>2015-06-19 16:49:37 -0400
commitc43019679a54aca9e729128193fb15c5f860d0e0 (patch)
tree7a1a2214a84a12eff556291ba7a2918de66928ac
parentd09fdf2b5887bfd6773f1a6da374a1ff7fc0e242 (diff)
downloadmongo-c43019679a54aca9e729128193fb15c5f860d0e0.tar.gz
SERVER-18036 renamed Fetcher::BatchData to QueryResponse
Fetcher callback semantics also updated so that when NextAction is NoAction, the follow up command BSON builder will be null.
-rw-r--r--src/mongo/client/fetcher.cpp42
-rw-r--r--src/mongo/client/fetcher.h25
-rw-r--r--src/mongo/client/fetcher_test.cpp72
-rw-r--r--src/mongo/client/query_fetcher.cpp18
-rw-r--r--src/mongo/client/query_fetcher.h22
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp4
-rw-r--r--src/mongo/db/repl/collection_cloner.h4
-rw-r--r--src/mongo/db/repl/data_replicator.cpp51
-rw-r--r--src/mongo/db/repl/data_replicator.h8
-rw-r--r--src/mongo/db/repl/database_cloner.cpp2
-rw-r--r--src/mongo/db/repl/database_cloner.h2
-rw-r--r--src/mongo/s/client/shard_registry.cpp2
12 files changed, 171 insertions, 81 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp
index 2760b092bd4..f11ced0ce2f 100644
--- a/src/mongo/client/fetcher.cpp
+++ b/src/mongo/client/fetcher.cpp
@@ -54,7 +54,7 @@ namespace {
*/
Status parseCursorResponse(const BSONObj& obj,
const std::string& batchFieldName,
- Fetcher::BatchData* batchData) {
+ Fetcher::QueryResponse* batchData) {
invariant(batchFieldName == kFirstBatchFieldName || batchFieldName == kNextBatchFieldName);
invariant(batchData);
@@ -136,9 +136,9 @@ namespace {
} // namespace
- Fetcher::BatchData::BatchData(CursorId theCursorId,
- const NamespaceString& theNss,
- Documents theDocuments)
+ Fetcher::QueryResponse::QueryResponse(CursorId theCursorId,
+ const NamespaceString& theNss,
+ Documents theDocuments)
: cursorId(theCursorId),
nss(theNss),
documents(theDocuments) { }
@@ -234,7 +234,7 @@ namespace {
const char* batchFieldName) {
if (!rcbd.response.isOK()) {
- _work(StatusWith<Fetcher::BatchData>(rcbd.response.getStatus()), nullptr, nullptr);
+ _work(StatusWith<Fetcher::QueryResponse>(rcbd.response.getStatus()), nullptr, nullptr);
_finishCallback();
return;
}
@@ -242,49 +242,61 @@ namespace {
const BSONObj& queryResponseObj = rcbd.response.getValue().data;
Status status = getStatusFromCommandResult(queryResponseObj);
if (!status.isOK()) {
- _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr);
+ _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr);
_finishCallback();
return;
}
status = parseReplResponse(queryResponseObj);
if (!status.isOK()) {
- _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr);
+ _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr);
_finishCallback();
return;
}
- BatchData batchData;
+ QueryResponse batchData;
status = parseCursorResponse(queryResponseObj, batchFieldName, &batchData);
if (!status.isOK()) {
- _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr);
+ _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr);
_finishCallback();
return;
}
NextAction nextAction = NextAction::kNoAction;
- if (batchData.cursorId) {
- nextAction = NextAction::kGetMore;
+ if (!batchData.cursorId) {
+ _work(StatusWith<QueryResponse>(batchData), &nextAction, nullptr);
+ _finishCallback();
+ return;
}
+ nextAction = NextAction::kGetMore;
+
BSONObjBuilder bob;
- _work(StatusWith<BatchData>(batchData), &nextAction, &bob);
+ _work(StatusWith<QueryResponse>(batchData), &nextAction, &bob);
// Callback function _work may modify nextAction to request the fetcher
// not to schedule a getMore command.
- if (!batchData.cursorId || nextAction != NextAction::kGetMore) {
+ if (nextAction != NextAction::kGetMore) {
+ _finishCallback();
+ return;
+ }
+
+ // Callback function may also disable the fetching of additional data by not filling in the
+ // BSONObjBuilder for the getMore command.
+ auto cmdObj = bob.obj();
+ if (cmdObj.isEmpty()) {
_finishCallback();
return;
}
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
- status = _schedule_inlock(bob.obj(), kNextBatchFieldName);
+ status = _schedule_inlock(cmdObj, kNextBatchFieldName);
}
if (!status.isOK()) {
nextAction = NextAction::kNoAction;
- _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr);
+ _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr);
_finishCallback();
return;
}
diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h
index 32b1c5aa8cb..53be407b1af 100644
--- a/src/mongo/client/fetcher.h
+++ b/src/mongo/client/fetcher.h
@@ -54,17 +54,25 @@ namespace mongo {
typedef std::vector<BSONObj> Documents;
/**
- * Documents in current batch with cursor ID and associated namespace name.
+ * Documents in current query response with cursor ID and associated namespace name.
* If cursor ID is zero, there are no additional batches.
*/
- struct BatchData {
- BatchData() = default;
- BatchData(CursorId theCursorId, const NamespaceString& theNss, Documents theDocuments);
+ struct QueryResponse {
+ QueryResponse() = default;
+ QueryResponse(CursorId theCursorId,
+ const NamespaceString& theNss,
+ Documents theDocuments);
CursorId cursorId = 0;
NamespaceString nss;
Documents documents;
+ // TODO: fill in with replication metadata.
+ struct OtherFields {
+ BSONObj metadata;
+ } otherFields;
};
+ using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>;
+
/**
* Represents next steps of fetcher.
*/
@@ -77,7 +85,7 @@ namespace mongo {
/**
* Type of a fetcher callback function.
*/
- typedef stdx::function<void (const StatusWith<BatchData>&,
+ typedef stdx::function<void (const StatusWith<QueryResponse>&,
NextAction*,
BSONObjBuilder*)> CallbackFn;
@@ -102,8 +110,11 @@ namespace mongo {
* If the fetcher is canceled (either by calling cancel() or shutting down the executor),
* 'work' will not be invoked.
*
- * Fetcher uses the NextAction argument to inform client via callback if a getMore command
- * will be scheduled to be run by the executor to retrieve additional results.
+ * Fetcher uses the NextAction and BSONObjBuilder arguments to inform client via callback
+ * if a follow-up (like getMore) command will be scheduled to be run by the executor to
+ * retrieve additional results. The BSONObjBuilder pointer will be valid only if NextAction
+ * is kGetMore.
+ * Otherwise, the BSONObjBuilder pointer will be null.
* Also, note that the NextAction is both an input and output argument to allow
* the client to suggest a different action for the fetcher to take post-callback.
*
diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp
index 204e89892d2..86a7a105658 100644
--- a/src/mongo/client/fetcher_test.cpp
+++ b/src/mongo/client/fetcher_test.cpp
@@ -64,15 +64,15 @@ namespace {
Status status;
CursorId cursorId;
+ NamespaceString nss;
Fetcher::Documents documents;
Fetcher::NextAction nextAction;
- Fetcher::NextAction newNextAction;
std::unique_ptr<Fetcher> fetcher;
// Called at end of _callback
Fetcher::CallbackFn callbackHook;
private:
- void _callback(const StatusWith<Fetcher::BatchData>& result,
+ void _callback(const StatusWith<Fetcher::QueryResponse>& result,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob);
};
@@ -101,8 +101,10 @@ namespace {
void FetcherTest::clear() {
status = getDetectableErrorStatus();
cursorId = -1;
+ nss = NamespaceString();
documents.clear();
nextAction = Fetcher::NextAction::kInvalid;
+ callbackHook = Fetcher::CallbackFn();
}
void FetcherTest::scheduleNetworkResponse(const BSONObj& obj) {
@@ -140,13 +142,14 @@ namespace {
ASSERT_FALSE(fetcher->isActive());
}
- void FetcherTest::_callback(const StatusWith<Fetcher::BatchData>& result,
+ void FetcherTest::_callback(const StatusWith<Fetcher::QueryResponse>& result,
Fetcher::NextAction* nextActionFromFetcher,
BSONObjBuilder* getMoreBob) {
status = result.getStatus();
if (result.isOK()) {
- const Fetcher::BatchData& batchData = result.getValue();
+ const Fetcher::QueryResponse& batchData = result.getValue();
cursorId = batchData.cursorId;
+ nss = batchData.nss;
documents = batchData.documents;
}
@@ -159,7 +162,7 @@ namespace {
}
}
- void unusedFetcherCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
+ void unusedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
FAIL("should not reach here");
@@ -376,6 +379,8 @@ namespace {
"firstBatch" << BSONArray()) <<
"ok" << 1));
ASSERT_OK(status);
+ ASSERT_EQUALS(0, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_TRUE(documents.empty());
}
@@ -388,6 +393,7 @@ namespace {
"ok" << 1));
ASSERT_OK(status);
ASSERT_EQUALS(0, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
}
@@ -395,17 +401,15 @@ namespace {
TEST_F(FetcherTest, SetNextActionToContinueWhenNextBatchIsNotAvailable) {
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
- callbackHook = [](const StatusWith<Fetcher::BatchData>& fetchResult,
+ callbackHook = [](const StatusWith<Fetcher::QueryResponse>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
ASSERT_OK(fetchResult.getStatus());
- Fetcher::BatchData batchData{fetchResult.getValue()};
+ Fetcher::QueryResponse batchData{fetchResult.getValue()};
ASSERT(nextAction);
*nextAction = Fetcher::NextAction::kGetMore;
- ASSERT(getMoreBob);
- getMoreBob->append("getMore", batchData.cursorId);
- getMoreBob->append("collection", batchData.nss.coll());
+ ASSERT_FALSE(getMoreBob);
};
processNetworkResponse(BSON("cursor" << BSON("id" << 0LL <<
"ns" << "db.coll" <<
@@ -413,11 +417,25 @@ namespace {
"ok" << 1));
ASSERT_OK(status);
ASSERT_EQUALS(0, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
}
+ void appendGetMoreRequest(const StatusWith<Fetcher::QueryResponse>& fetchResult,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ if (!getMoreBob) {
+ return;
+ }
+ const auto& batchData = fetchResult.getValue();
+ getMoreBob->append("getMore", batchData.cursorId);
+ getMoreBob->append("collection", batchData.nss.coll());
+ }
+
TEST_F(FetcherTest, FetchMultipleBatches) {
+ callbackHook = appendGetMoreRequest;
+
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
@@ -426,6 +444,8 @@ namespace {
"ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
+ ASSERT_EQUALS(1LL, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
@@ -439,6 +459,8 @@ namespace {
"ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
+ ASSERT_EQUALS(1LL, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
@@ -452,6 +474,8 @@ namespace {
"ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
+ ASSERT_EQUALS(0, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc3, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction);
@@ -461,6 +485,8 @@ namespace {
}
TEST_F(FetcherTest, ScheduleGetMoreAndCancel) {
+ callbackHook = appendGetMoreRequest;
+
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
@@ -469,6 +495,8 @@ namespace {
"ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
+ ASSERT_EQUALS(1LL, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
@@ -482,6 +510,8 @@ namespace {
"ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
+ ASSERT_EQUALS(1LL, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
@@ -493,6 +523,8 @@ namespace {
}
TEST_F(FetcherTest, ScheduleGetMoreButShutdown) {
+ callbackHook = appendGetMoreRequest;
+
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
@@ -501,6 +533,8 @@ namespace {
"ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
+ ASSERT_EQUALS(1LL, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
@@ -514,6 +548,8 @@ namespace {
"ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
+ ASSERT_EQUALS(1LL, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
@@ -524,13 +560,15 @@ namespace {
ASSERT_NOT_OK(status);
}
- void setNextActionToNoAction(const StatusWith<Fetcher::BatchData>& fetchResult,
+ void setNextActionToNoAction(const StatusWith<Fetcher::QueryResponse>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
*nextAction = Fetcher::NextAction::kNoAction;
}
TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) {
+ callbackHook = appendGetMoreRequest;
+
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
@@ -539,6 +577,8 @@ namespace {
"ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
+ ASSERT_EQUALS(1LL, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
@@ -555,6 +595,8 @@ namespace {
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
+ ASSERT_EQUALS(1LL, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc2, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction);
@@ -564,7 +606,7 @@ namespace {
/**
* This will be invoked twice before the fetcher returns control to the replication executor.
*/
- void shutdownDuringSecondBatch(const StatusWith<Fetcher::BatchData>& fetchResult,
+ void shutdownDuringSecondBatch(const StatusWith<Fetcher::QueryResponse>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob,
const BSONObj& doc2,
@@ -575,7 +617,7 @@ namespace {
// First time during second batch
ASSERT_OK(fetchResult.getStatus());
- Fetcher::BatchData batchData{fetchResult.getValue()};
+ Fetcher::QueryResponse batchData{fetchResult.getValue()};
ASSERT_EQUALS(1U, batchData.documents.size());
ASSERT_EQUALS(doc2, batchData.documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == *nextAction);
@@ -588,6 +630,8 @@ namespace {
}
TEST_F(FetcherTest, ShutdownDuringSecondBatch) {
+ callbackHook = appendGetMoreRequest;
+
ASSERT_OK(fetcher->schedule());
const BSONObj doc = BSON("_id" << 1);
scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL <<
@@ -596,6 +640,8 @@ namespace {
"ok" << 1));
getNet()->runReadyNetworkOperations();
ASSERT_OK(status);
+ ASSERT_EQUALS(1LL, cursorId);
+ ASSERT_EQUALS("db.coll", nss.ns());
ASSERT_EQUALS(1U, documents.size());
ASSERT_EQUALS(doc, documents.front());
ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction);
diff --git a/src/mongo/client/query_fetcher.cpp b/src/mongo/client/query_fetcher.cpp
index c0a8ed754d7..e6d765cbad8 100644
--- a/src/mongo/client/query_fetcher.cpp
+++ b/src/mongo/client/query_fetcher.cpp
@@ -52,7 +52,11 @@ namespace mongo {
}
- void QueryFetcher::_onFetchCallback(const BatchDataStatus& fetchResult,
+ int QueryFetcher::_getResponses() const {
+ return _responses;
+ }
+
+ void QueryFetcher::_onFetchCallback(const Fetcher::QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
@@ -69,7 +73,17 @@ namespace mongo {
}
}
- std::string QueryFetcher::toString() const {
+ void QueryFetcher::_onQueryResponse(const Fetcher::QueryResponseStatus& fetchResult,
+ Fetcher::NextAction* nextAction) {
+ _work(fetchResult, nextAction);
+ }
+
+ void QueryFetcher::_delegateCallback(const Fetcher::QueryResponseStatus& fetchResult,
+ Fetcher::NextAction* nextAction) {
+ _onQueryResponse(fetchResult, nextAction);
+ };
+
+ std::string QueryFetcher::getDiagnosticString() const {
return str::stream() << "QueryFetcher -"
<< " responses: " << _responses
<< " fetcher: " << _fetcher.getDiagnosticString();
diff --git a/src/mongo/client/query_fetcher.h b/src/mongo/client/query_fetcher.h
index 21f7461fb43..3ac11191e02 100644
--- a/src/mongo/client/query_fetcher.h
+++ b/src/mongo/client/query_fetcher.h
@@ -50,8 +50,8 @@ namespace mongo {
class QueryFetcher {
MONGO_DISALLOW_COPYING(QueryFetcher);
public:
- using BatchDataStatus = StatusWith<Fetcher::BatchData>;
- using CallbackFn = stdx::function<void (const BatchDataStatus&, Fetcher::NextAction*)>;
+ using CallbackFn =
+ stdx::function<void (const Fetcher::QueryResponseStatus&, Fetcher::NextAction*)>;
QueryFetcher(executor::TaskExecutor* exec,
const HostAndPort& source,
@@ -64,18 +64,24 @@ namespace mongo {
Status schedule() { return _fetcher.schedule(); }
void cancel() { return _fetcher.cancel(); }
void wait() { if (_fetcher.isActive()) _fetcher.wait(); }
- std::string toString() const;
+ std::string getDiagnosticString() const;
protected:
- void _onFetchCallback(const BatchDataStatus& fetchResult,
+ int _getResponses() const;
+ void _onFetchCallback(const Fetcher::QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob);
- virtual void _delegateCallback(const BatchDataStatus& fetchResult,
- Fetcher::NextAction* nextAction) {
- _work(fetchResult, nextAction);
- };
+ /**
+ * Called by _delegateCallback() to forward query results to '_work'.
+ */
+ void _onQueryResponse(const Fetcher::QueryResponseStatus& fetchResult,
+ Fetcher::NextAction* nextAction);
+ virtual void _delegateCallback(const Fetcher::QueryResponseStatus& fetchResult,
+ Fetcher::NextAction* nextAction);
+
+ private:
executor::TaskExecutor* _exec;
Fetcher _fetcher;
int _responses;
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index e27fa77c7ec..5abe3c2ed84 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -185,7 +185,7 @@ namespace repl {
_scheduleDbWorkFn = scheduleDbWorkFn;
}
- void CollectionCloner::_listIndexesCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
+ void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
if (!fetchResult.isOK()) {
@@ -224,7 +224,7 @@ namespace repl {
_dbWorkCallbackHandle = scheduleResult.getValue();
}
- void CollectionCloner::_findCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
+ void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
if (!fetchResult.isOK()) {
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 19ff6850163..69f3caa1f18 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -123,14 +123,14 @@ namespace repl {
/**
* Read index specs from listIndexes result.
*/
- void _listIndexesCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
+ void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob);
/**
* Read collection documents from find result.
*/
- void _findCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
+ void _findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob);
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 287f81bca30..0344608dc78 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -114,7 +114,7 @@ std::string toString(DataReplicatorState s) {
protected:
- void _delegateCallback(const BatchDataStatus& fetchResult,
+ void _delegateCallback(const Fetcher::QueryResponseStatus& fetchResult,
NextAction* nextAction);
const Timestamp _startTS;
@@ -139,13 +139,12 @@ std::string toString(DataReplicatorState s) {
std::string OplogFetcher::toString() const {
return str::stream() << "OplogReader -"
<< " startTS: " << _startTS.toString()
- << " responses: " << _responses
- << " fetcher: " << _fetcher.getDiagnosticString();
+ << " fetcher: " << QueryFetcher::getDiagnosticString();
}
- void OplogFetcher::_delegateCallback(const BatchDataStatus& fetchResult,
+ void OplogFetcher::_delegateCallback(const Fetcher::QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction) {
- const bool checkStartTS = _responses == 0;
+ const bool checkStartTS = _getResponses() == 0;
if (fetchResult.isOK()) {
Fetcher::Documents::const_iterator firstDoc = fetchResult.getValue().documents.begin();
@@ -155,40 +154,42 @@ std::string toString(DataReplicatorState s) {
if (!hasDoc) {
// Set next action to none.
*nextAction = Fetcher::NextAction::kNoAction;
- _work(Status(ErrorCodes::OplogStartMissing, str::stream() <<
- "No operations on sync source with op time starting at: " <<
- _startTS.toString()),
- nextAction);
+ _onQueryResponse(
+ Status(ErrorCodes::OplogStartMissing, str::stream() <<
+ "No operations on sync source with op time starting at: " <<
+ _startTS.toString()),
+ nextAction);
return;
} else if ((*firstDoc)["ts"].eoo()) {
// Set next action to none.
*nextAction = Fetcher::NextAction::kNoAction;
- _work(Status(ErrorCodes::OplogStartMissing, str::stream() <<
- "Missing 'ts' field in first returned " << (*firstDoc)["ts"] <<
- " starting at " << _startTS.toString()),
- nextAction);
+ _onQueryResponse(
+ Status(ErrorCodes::OplogStartMissing, str::stream() <<
+ "Missing 'ts' field in first returned " << (*firstDoc)["ts"] <<
+ " starting at " << _startTS.toString()),
+ nextAction);
return;
} else if ((*firstDoc)["ts"].timestamp() != _startTS) {
// Set next action to none.
*nextAction = Fetcher::NextAction::kNoAction;
- _work(Status(ErrorCodes::OplogStartMissing,
- str::stream() << "First returned " << (*firstDoc)["ts"]
- << " is not where we wanted to start: "
- << _startTS.toString()),
- nextAction);
+ _onQueryResponse(
+ Status(ErrorCodes::OplogStartMissing, str::stream() <<
+ "First returned " << (*firstDoc)["ts"] <<
+ " is not where we wanted to start: " << _startTS.toString()),
+ nextAction);
return;
}
}
if (hasDoc) {
- _work(fetchResult, nextAction);
+ _onQueryResponse(fetchResult, nextAction);
}
else {
}
}
else {
- _work(fetchResult, nextAction);
+ _onQueryResponse(fetchResult, nextAction);
}
};
@@ -321,7 +322,7 @@ std::string toString(DataReplicatorState s) {
const NamespaceString& oplogNS);
void setStatus(const Status& s);
void setStatus(const CBHStatus& s);
- void _setTimestampStatus(const BatchDataStatus& fetchResult,
+ void _setTimestampStatus(const QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction,
TimestampStatus* status) ;
};
@@ -352,7 +353,7 @@ std::string toString(DataReplicatorState s) {
return timestampStatus;
}
- void InitialSyncState::_setTimestampStatus(const BatchDataStatus& fetchResult,
+ void InitialSyncState::_setTimestampStatus(const QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction,
TimestampStatus* status) {
if (!fetchResult.isOK()) {
@@ -863,7 +864,7 @@ std::string toString(DataReplicatorState s) {
}
}
- void DataReplicator::_onApplierReadyStart(const BatchDataStatus& fetchResult,
+ void DataReplicator::_onApplierReadyStart(const QueryResponseStatus& fetchResult,
NextAction* nextAction) {
// Data clone done, move onto apply.
TimestampStatus ts(ErrorCodes::OplogStartMissing, "");
@@ -1096,7 +1097,7 @@ std::string toString(DataReplicatorState s) {
}
}
- void DataReplicator::_onMissingFetched(const BatchDataStatus& fetchResult,
+ void DataReplicator::_onMissingFetched(const QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction,
const Operations& ops,
const NamespaceString nss) {
@@ -1290,7 +1291,7 @@ std::string toString(DataReplicatorState s) {
return status;
}
- void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::BatchData>& fetchResult,
+ void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::QueryResponse>& fetchResult,
Fetcher::NextAction* nextAction) {
const Status status = fetchResult.getStatus();
if (status.code() == ErrorCodes::CallbackCanceled)
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index aa347ae7dd2..8f1717db09a 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -54,7 +54,7 @@ class QueryFetcher;
namespace repl {
using Operations = Applier::Operations;
-using BatchDataStatus = StatusWith<Fetcher::BatchData>;
+using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>;
using CallbackArgs = ReplicationExecutor::CallbackArgs;
using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>;
using CommandCallbackArgs = ReplicationExecutor::RemoteCommandCallbackArgs;
@@ -199,7 +199,7 @@ private:
// Only executed via executor
void _resumeFinish(CallbackArgs cbData);
- void _onOplogFetchFinish(const BatchDataStatus& fetchResult,
+ void _onOplogFetchFinish(const QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction);
void _doNextActions();
void _doNextActions_InitialSync_inlock();
@@ -219,14 +219,14 @@ private:
void _handleFailedApplyBatch(const TimestampStatus&, const Operations&);
// Fetches the last doc from the first operation, and reschedules the apply for the ops.
void _scheduleApplyAfterFetch(const Operations&);
- void _onMissingFetched(const BatchDataStatus& fetchResult,
+ void _onMissingFetched(const QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction,
const Operations& ops,
const NamespaceString nss);
void _onDataClonerFinish(const Status& status);
// Called after _onDataClonerFinish when the new Timestamp is avail, to use for minvalid
- void _onApplierReadyStart(const BatchDataStatus& fetchResult,
+ void _onApplierReadyStart(const QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction);
Status _scheduleApplyBatch();
diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp
index dc0f6f8990a..b96ab403169 100644
--- a/src/mongo/db/repl/database_cloner.cpp
+++ b/src/mongo/db/repl/database_cloner.cpp
@@ -186,7 +186,7 @@ namespace {
_startCollectionCloner = startCollectionCloner;
}
- void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::BatchData>& result,
+ void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::QueryResponse>& result,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h
index 071b33d0e96..f1171bf5e4b 100644
--- a/src/mongo/db/repl/database_cloner.h
+++ b/src/mongo/db/repl/database_cloner.h
@@ -138,7 +138,7 @@ namespace repl {
/**
* Read collection names and options from listCollections result.
*/
- void _listCollectionsCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
+ void _listCollectionsCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob);
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index ae0edd39146..01efc45aaa4 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -258,7 +258,7 @@ namespace {
Status status = Status(ErrorCodes::InternalError, "Internal error running find command");
vector<BSONObj> results;
- auto fetcherCallback = [&status, &results](const QueryFetcher::BatchDataStatus& dataStatus,
+ auto fetcherCallback = [&status, &results](const Fetcher::QueryResponseStatus& dataStatus,
Fetcher::NextAction* nextAction) {
// Throw out any accumulated results on error