diff options
author | Benety Goh <benety@mongodb.com> | 2015-06-19 15:53:06 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-06-19 16:49:37 -0400 |
commit | c43019679a54aca9e729128193fb15c5f860d0e0 (patch) | |
tree | 7a1a2214a84a12eff556291ba7a2918de66928ac /src | |
parent | d09fdf2b5887bfd6773f1a6da374a1ff7fc0e242 (diff) | |
download | mongo-c43019679a54aca9e729128193fb15c5f860d0e0.tar.gz |
SERVER-18036 renamed Fetcher::BatchData to QueryResponse
Fetcher callback semantics also updated so that when NextAction is NoAction,
the follow up command BSON builder will be null.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/client/fetcher.cpp | 42 | ||||
-rw-r--r-- | src/mongo/client/fetcher.h | 25 | ||||
-rw-r--r-- | src/mongo/client/fetcher_test.cpp | 72 | ||||
-rw-r--r-- | src/mongo/client/query_fetcher.cpp | 18 | ||||
-rw-r--r-- | src/mongo/client/query_fetcher.h | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 51 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner.h | 2 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 2 |
12 files changed, 171 insertions, 81 deletions
diff --git a/src/mongo/client/fetcher.cpp b/src/mongo/client/fetcher.cpp index 2760b092bd4..f11ced0ce2f 100644 --- a/src/mongo/client/fetcher.cpp +++ b/src/mongo/client/fetcher.cpp @@ -54,7 +54,7 @@ namespace { */ Status parseCursorResponse(const BSONObj& obj, const std::string& batchFieldName, - Fetcher::BatchData* batchData) { + Fetcher::QueryResponse* batchData) { invariant(batchFieldName == kFirstBatchFieldName || batchFieldName == kNextBatchFieldName); invariant(batchData); @@ -136,9 +136,9 @@ namespace { } // namespace - Fetcher::BatchData::BatchData(CursorId theCursorId, - const NamespaceString& theNss, - Documents theDocuments) + Fetcher::QueryResponse::QueryResponse(CursorId theCursorId, + const NamespaceString& theNss, + Documents theDocuments) : cursorId(theCursorId), nss(theNss), documents(theDocuments) { } @@ -234,7 +234,7 @@ namespace { const char* batchFieldName) { if (!rcbd.response.isOK()) { - _work(StatusWith<Fetcher::BatchData>(rcbd.response.getStatus()), nullptr, nullptr); + _work(StatusWith<Fetcher::QueryResponse>(rcbd.response.getStatus()), nullptr, nullptr); _finishCallback(); return; } @@ -242,49 +242,61 @@ namespace { const BSONObj& queryResponseObj = rcbd.response.getValue().data; Status status = getStatusFromCommandResult(queryResponseObj); if (!status.isOK()) { - _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr); + _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); _finishCallback(); return; } status = parseReplResponse(queryResponseObj); if (!status.isOK()) { - _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr); + _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); _finishCallback(); return; } - BatchData batchData; + QueryResponse batchData; status = parseCursorResponse(queryResponseObj, batchFieldName, &batchData); if (!status.isOK()) { - _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr); + _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); _finishCallback(); return; } NextAction nextAction = NextAction::kNoAction; - if (batchData.cursorId) { - nextAction = NextAction::kGetMore; + if (!batchData.cursorId) { + _work(StatusWith<QueryResponse>(batchData), &nextAction, nullptr); + _finishCallback(); + return; } + nextAction = NextAction::kGetMore; + BSONObjBuilder bob; - _work(StatusWith<BatchData>(batchData), &nextAction, &bob); + _work(StatusWith<QueryResponse>(batchData), &nextAction, &bob); // Callback function _work may modify nextAction to request the fetcher // not to schedule a getMore command. - if (!batchData.cursorId || nextAction != NextAction::kGetMore) { + if (nextAction != NextAction::kGetMore) { + _finishCallback(); + return; + } + + // Callback function may also disable the fetching of additional data by not filling in the + // BSONObjBuilder for the getMore command. + auto cmdObj = bob.obj(); + if (cmdObj.isEmpty()) { _finishCallback(); return; } { stdx::lock_guard<stdx::mutex> lk(_mutex); - status = _schedule_inlock(bob.obj(), kNextBatchFieldName); + status = _schedule_inlock(cmdObj, kNextBatchFieldName); } if (!status.isOK()) { nextAction = NextAction::kNoAction; - _work(StatusWith<Fetcher::BatchData>(status), nullptr, nullptr); + _work(StatusWith<Fetcher::QueryResponse>(status), nullptr, nullptr); _finishCallback(); return; } diff --git a/src/mongo/client/fetcher.h b/src/mongo/client/fetcher.h index 32b1c5aa8cb..53be407b1af 100644 --- a/src/mongo/client/fetcher.h +++ b/src/mongo/client/fetcher.h @@ -54,17 +54,25 @@ namespace mongo { typedef std::vector<BSONObj> Documents; /** - * Documents in current batch with cursor ID and associated namespace name. + * Documents in current query response with cursor ID and associated namespace name. * If cursor ID is zero, there are no additional batches. */ - struct BatchData { - BatchData() = default; - BatchData(CursorId theCursorId, const NamespaceString& theNss, Documents theDocuments); + struct QueryResponse { + QueryResponse() = default; + QueryResponse(CursorId theCursorId, + const NamespaceString& theNss, + Documents theDocuments); CursorId cursorId = 0; NamespaceString nss; Documents documents; + // TODO: fill in with replication metadata. + struct OtherFields { + BSONObj metadata; + } otherFields; }; + using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>; + /** * Represents next steps of fetcher. */ @@ -77,7 +85,7 @@ namespace mongo { /** * Type of a fetcher callback function. */ - typedef stdx::function<void (const StatusWith<BatchData>&, + typedef stdx::function<void (const StatusWith<QueryResponse>&, NextAction*, BSONObjBuilder*)> CallbackFn; @@ -102,8 +110,11 @@ namespace mongo { * If the fetcher is canceled (either by calling cancel() or shutting down the executor), * 'work' will not be invoked. * - * Fetcher uses the NextAction argument to inform client via callback if a getMore command - * will be scheduled to be run by the executor to retrieve additional results. + * Fetcher uses the NextAction and BSONObjBuilder arguments to inform client via callback + * if a follow-up (like getMore) command will be scheduled to be run by the executor to + * retrieve additional results. The BSONObjBuilder pointer will be valid only if NextAction + * is kGetMore. + * Otherwise, the BSONObjBuilder pointer will be null. * Also, note that the NextAction is both an input and output argument to allow * the client to suggest a different action for the fetcher to take post-callback. * diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp index 204e89892d2..86a7a105658 100644 --- a/src/mongo/client/fetcher_test.cpp +++ b/src/mongo/client/fetcher_test.cpp @@ -64,15 +64,15 @@ namespace { Status status; CursorId cursorId; + NamespaceString nss; Fetcher::Documents documents; Fetcher::NextAction nextAction; - Fetcher::NextAction newNextAction; std::unique_ptr<Fetcher> fetcher; // Called at end of _callback Fetcher::CallbackFn callbackHook; private: - void _callback(const StatusWith<Fetcher::BatchData>& result, + void _callback(const StatusWith<Fetcher::QueryResponse>& result, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob); }; @@ -101,8 +101,10 @@ namespace { void FetcherTest::clear() { status = getDetectableErrorStatus(); cursorId = -1; + nss = NamespaceString(); documents.clear(); nextAction = Fetcher::NextAction::kInvalid; + callbackHook = Fetcher::CallbackFn(); } void FetcherTest::scheduleNetworkResponse(const BSONObj& obj) { @@ -140,13 +142,14 @@ namespace { ASSERT_FALSE(fetcher->isActive()); } - void FetcherTest::_callback(const StatusWith<Fetcher::BatchData>& result, + void FetcherTest::_callback(const StatusWith<Fetcher::QueryResponse>& result, Fetcher::NextAction* nextActionFromFetcher, BSONObjBuilder* getMoreBob) { status = result.getStatus(); if (result.isOK()) { - const Fetcher::BatchData& batchData = result.getValue(); + const Fetcher::QueryResponse& batchData = result.getValue(); cursorId = batchData.cursorId; + nss = batchData.nss; documents = batchData.documents; } @@ -159,7 +162,7 @@ namespace { } } - void unusedFetcherCallback(const StatusWith<Fetcher::BatchData>& fetchResult, + void unusedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { FAIL("should not reach here"); @@ -376,6 +379,8 @@ namespace { "firstBatch" << BSONArray()) << "ok" << 1)); ASSERT_OK(status); + ASSERT_EQUALS(0, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_TRUE(documents.empty()); } @@ -388,6 +393,7 @@ namespace { "ok" << 1)); ASSERT_OK(status); ASSERT_EQUALS(0, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); } @@ -395,17 +401,15 @@ namespace { TEST_F(FetcherTest, SetNextActionToContinueWhenNextBatchIsNotAvailable) { ASSERT_OK(fetcher->schedule()); const BSONObj doc = BSON("_id" << 1); - callbackHook = [](const StatusWith<Fetcher::BatchData>& fetchResult, + callbackHook = [](const StatusWith<Fetcher::QueryResponse>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { ASSERT_OK(fetchResult.getStatus()); - Fetcher::BatchData batchData{fetchResult.getValue()}; + Fetcher::QueryResponse batchData{fetchResult.getValue()}; ASSERT(nextAction); *nextAction = Fetcher::NextAction::kGetMore; - ASSERT(getMoreBob); - getMoreBob->append("getMore", batchData.cursorId); - getMoreBob->append("collection", batchData.nss.coll()); + ASSERT_FALSE(getMoreBob); }; processNetworkResponse(BSON("cursor" << BSON("id" << 0LL << "ns" << "db.coll" << @@ -413,11 +417,25 @@ namespace { "ok" << 1)); ASSERT_OK(status); ASSERT_EQUALS(0, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); } + void appendGetMoreRequest(const StatusWith<Fetcher::QueryResponse>& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + if (!getMoreBob) { + return; + } + const auto& batchData = fetchResult.getValue(); + getMoreBob->append("getMore", batchData.cursorId); + getMoreBob->append("collection", batchData.nss.coll()); + } + TEST_F(FetcherTest, FetchMultipleBatches) { + callbackHook = appendGetMoreRequest; + ASSERT_OK(fetcher->schedule()); const BSONObj doc = BSON("_id" << 1); scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << @@ -426,6 +444,8 @@ namespace { "ok" << 1)); getNet()->runReadyNetworkOperations(); ASSERT_OK(status); + ASSERT_EQUALS(1LL, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); @@ -439,6 +459,8 @@ namespace { "ok" << 1)); getNet()->runReadyNetworkOperations(); ASSERT_OK(status); + ASSERT_EQUALS(1LL, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc2, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); @@ -452,6 +474,8 @@ namespace { "ok" << 1)); getNet()->runReadyNetworkOperations(); ASSERT_OK(status); + ASSERT_EQUALS(0, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc3, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction); @@ -461,6 +485,8 @@ namespace { } TEST_F(FetcherTest, ScheduleGetMoreAndCancel) { + callbackHook = appendGetMoreRequest; + ASSERT_OK(fetcher->schedule()); const BSONObj doc = BSON("_id" << 1); scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << @@ -469,6 +495,8 @@ namespace { "ok" << 1)); getNet()->runReadyNetworkOperations(); ASSERT_OK(status); + ASSERT_EQUALS(1LL, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); @@ -482,6 +510,8 @@ namespace { "ok" << 1)); getNet()->runReadyNetworkOperations(); ASSERT_OK(status); + ASSERT_EQUALS(1LL, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc2, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); @@ -493,6 +523,8 @@ namespace { } TEST_F(FetcherTest, ScheduleGetMoreButShutdown) { + callbackHook = appendGetMoreRequest; + ASSERT_OK(fetcher->schedule()); const BSONObj doc = BSON("_id" << 1); scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << @@ -501,6 +533,8 @@ namespace { "ok" << 1)); getNet()->runReadyNetworkOperations(); ASSERT_OK(status); + ASSERT_EQUALS(1LL, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); @@ -514,6 +548,8 @@ namespace { "ok" << 1)); getNet()->runReadyNetworkOperations(); ASSERT_OK(status); + ASSERT_EQUALS(1LL, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc2, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); @@ -524,13 +560,15 @@ namespace { ASSERT_NOT_OK(status); } - void setNextActionToNoAction(const StatusWith<Fetcher::BatchData>& fetchResult, + void setNextActionToNoAction(const StatusWith<Fetcher::QueryResponse>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { *nextAction = Fetcher::NextAction::kNoAction; } TEST_F(FetcherTest, UpdateNextActionAfterSecondBatch) { + callbackHook = appendGetMoreRequest; + ASSERT_OK(fetcher->schedule()); const BSONObj doc = BSON("_id" << 1); scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << @@ -539,6 +577,8 @@ namespace { "ok" << 1)); getNet()->runReadyNetworkOperations(); ASSERT_OK(status); + ASSERT_EQUALS(1LL, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); @@ -555,6 +595,8 @@ namespace { getNet()->runReadyNetworkOperations(); ASSERT_OK(status); + ASSERT_EQUALS(1LL, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc2, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kNoAction == nextAction); @@ -564,7 +606,7 @@ namespace { /** * This will be invoked twice before the fetcher returns control to the replication executor. */ - void shutdownDuringSecondBatch(const StatusWith<Fetcher::BatchData>& fetchResult, + void shutdownDuringSecondBatch(const StatusWith<Fetcher::QueryResponse>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob, const BSONObj& doc2, @@ -575,7 +617,7 @@ namespace { // First time during second batch ASSERT_OK(fetchResult.getStatus()); - Fetcher::BatchData batchData{fetchResult.getValue()}; + Fetcher::QueryResponse batchData{fetchResult.getValue()}; ASSERT_EQUALS(1U, batchData.documents.size()); ASSERT_EQUALS(doc2, batchData.documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == *nextAction); @@ -588,6 +630,8 @@ namespace { } TEST_F(FetcherTest, ShutdownDuringSecondBatch) { + callbackHook = appendGetMoreRequest; + ASSERT_OK(fetcher->schedule()); const BSONObj doc = BSON("_id" << 1); scheduleNetworkResponse(BSON("cursor" << BSON("id" << 1LL << @@ -596,6 +640,8 @@ namespace { "ok" << 1)); getNet()->runReadyNetworkOperations(); ASSERT_OK(status); + ASSERT_EQUALS(1LL, cursorId); + ASSERT_EQUALS("db.coll", nss.ns()); ASSERT_EQUALS(1U, documents.size()); ASSERT_EQUALS(doc, documents.front()); ASSERT_TRUE(Fetcher::NextAction::kGetMore == nextAction); diff --git a/src/mongo/client/query_fetcher.cpp b/src/mongo/client/query_fetcher.cpp index c0a8ed754d7..e6d765cbad8 100644 --- a/src/mongo/client/query_fetcher.cpp +++ b/src/mongo/client/query_fetcher.cpp @@ -52,7 +52,11 @@ namespace mongo { } - void QueryFetcher::_onFetchCallback(const BatchDataStatus& fetchResult, + int QueryFetcher::_getResponses() const { + return _responses; + } + + void QueryFetcher::_onFetchCallback(const Fetcher::QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { @@ -69,7 +73,17 @@ namespace mongo { } } - std::string QueryFetcher::toString() const { + void QueryFetcher::_onQueryResponse(const Fetcher::QueryResponseStatus& fetchResult, + Fetcher::NextAction* nextAction) { + _work(fetchResult, nextAction); + } + + void QueryFetcher::_delegateCallback(const Fetcher::QueryResponseStatus& fetchResult, + Fetcher::NextAction* nextAction) { + _onQueryResponse(fetchResult, nextAction); + }; + + std::string QueryFetcher::getDiagnosticString() const { return str::stream() << "QueryFetcher -" << " responses: " << _responses << " fetcher: " << _fetcher.getDiagnosticString(); diff --git a/src/mongo/client/query_fetcher.h b/src/mongo/client/query_fetcher.h index 21f7461fb43..3ac11191e02 100644 --- a/src/mongo/client/query_fetcher.h +++ b/src/mongo/client/query_fetcher.h @@ -50,8 +50,8 @@ namespace mongo { class QueryFetcher { MONGO_DISALLOW_COPYING(QueryFetcher); public: - using BatchDataStatus = StatusWith<Fetcher::BatchData>; - using CallbackFn = stdx::function<void (const BatchDataStatus&, Fetcher::NextAction*)>; + using CallbackFn = + stdx::function<void (const Fetcher::QueryResponseStatus&, Fetcher::NextAction*)>; QueryFetcher(executor::TaskExecutor* exec, const HostAndPort& source, @@ -64,18 +64,24 @@ namespace mongo { Status schedule() { return _fetcher.schedule(); } void cancel() { return _fetcher.cancel(); } void wait() { if (_fetcher.isActive()) _fetcher.wait(); } - std::string toString() const; + std::string getDiagnosticString() const; protected: - void _onFetchCallback(const BatchDataStatus& fetchResult, + int _getResponses() const; + void _onFetchCallback(const Fetcher::QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob); - virtual void _delegateCallback(const BatchDataStatus& fetchResult, - Fetcher::NextAction* nextAction) { - _work(fetchResult, nextAction); - }; + /** + * Called by _delegateCallback() to forward query results to '_work'. + */ + void _onQueryResponse(const Fetcher::QueryResponseStatus& fetchResult, + Fetcher::NextAction* nextAction); + virtual void _delegateCallback(const Fetcher::QueryResponseStatus& fetchResult, + Fetcher::NextAction* nextAction); + + private: executor::TaskExecutor* _exec; Fetcher _fetcher; int _responses; diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index e27fa77c7ec..5abe3c2ed84 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -185,7 +185,7 @@ namespace repl { _scheduleDbWorkFn = scheduleDbWorkFn; } - void CollectionCloner::_listIndexesCallback(const StatusWith<Fetcher::BatchData>& fetchResult, + void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { if (!fetchResult.isOK()) { @@ -224,7 +224,7 @@ namespace repl { _dbWorkCallbackHandle = scheduleResult.getValue(); } - void CollectionCloner::_findCallback(const StatusWith<Fetcher::BatchData>& fetchResult, + void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { if (!fetchResult.isOK()) { diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 19ff6850163..69f3caa1f18 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -123,14 +123,14 @@ namespace repl { /** * Read index specs from listIndexes result. */ - void _listIndexesCallback(const StatusWith<Fetcher::BatchData>& fetchResult, + void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob); /** * Read collection documents from find result. */ - void _findCallback(const StatusWith<Fetcher::BatchData>& fetchResult, + void _findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob); diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 287f81bca30..0344608dc78 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -114,7 +114,7 @@ std::string toString(DataReplicatorState s) { protected: - void _delegateCallback(const BatchDataStatus& fetchResult, + void _delegateCallback(const Fetcher::QueryResponseStatus& fetchResult, NextAction* nextAction); const Timestamp _startTS; @@ -139,13 +139,12 @@ std::string toString(DataReplicatorState s) { std::string OplogFetcher::toString() const { return str::stream() << "OplogReader -" << " startTS: " << _startTS.toString() - << " responses: " << _responses - << " fetcher: " << _fetcher.getDiagnosticString(); + << " fetcher: " << QueryFetcher::getDiagnosticString(); } - void OplogFetcher::_delegateCallback(const BatchDataStatus& fetchResult, + void OplogFetcher::_delegateCallback(const Fetcher::QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction) { - const bool checkStartTS = _responses == 0; + const bool checkStartTS = _getResponses() == 0; if (fetchResult.isOK()) { Fetcher::Documents::const_iterator firstDoc = fetchResult.getValue().documents.begin(); @@ -155,40 +154,42 @@ std::string toString(DataReplicatorState s) { if (!hasDoc) { // Set next action to none. *nextAction = Fetcher::NextAction::kNoAction; - _work(Status(ErrorCodes::OplogStartMissing, str::stream() << - "No operations on sync source with op time starting at: " << - _startTS.toString()), - nextAction); + _onQueryResponse( + Status(ErrorCodes::OplogStartMissing, str::stream() << + "No operations on sync source with op time starting at: " << + _startTS.toString()), + nextAction); return; } else if ((*firstDoc)["ts"].eoo()) { // Set next action to none. *nextAction = Fetcher::NextAction::kNoAction; - _work(Status(ErrorCodes::OplogStartMissing, str::stream() << - "Missing 'ts' field in first returned " << (*firstDoc)["ts"] << - " starting at " << _startTS.toString()), - nextAction); + _onQueryResponse( + Status(ErrorCodes::OplogStartMissing, str::stream() << + "Missing 'ts' field in first returned " << (*firstDoc)["ts"] << + " starting at " << _startTS.toString()), + nextAction); return; } else if ((*firstDoc)["ts"].timestamp() != _startTS) { // Set next action to none. *nextAction = Fetcher::NextAction::kNoAction; - _work(Status(ErrorCodes::OplogStartMissing, - str::stream() << "First returned " << (*firstDoc)["ts"] - << " is not where we wanted to start: " - << _startTS.toString()), - nextAction); + _onQueryResponse( + Status(ErrorCodes::OplogStartMissing, str::stream() << + "First returned " << (*firstDoc)["ts"] << + " is not where we wanted to start: " << _startTS.toString()), + nextAction); return; } } if (hasDoc) { - _work(fetchResult, nextAction); + _onQueryResponse(fetchResult, nextAction); } else { } } else { - _work(fetchResult, nextAction); + _onQueryResponse(fetchResult, nextAction); } }; @@ -321,7 +322,7 @@ std::string toString(DataReplicatorState s) { const NamespaceString& oplogNS); void setStatus(const Status& s); void setStatus(const CBHStatus& s); - void _setTimestampStatus(const BatchDataStatus& fetchResult, + void _setTimestampStatus(const QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction, TimestampStatus* status) ; }; @@ -352,7 +353,7 @@ std::string toString(DataReplicatorState s) { return timestampStatus; } - void InitialSyncState::_setTimestampStatus(const BatchDataStatus& fetchResult, + void InitialSyncState::_setTimestampStatus(const QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction, TimestampStatus* status) { if (!fetchResult.isOK()) { @@ -863,7 +864,7 @@ std::string toString(DataReplicatorState s) { } } - void DataReplicator::_onApplierReadyStart(const BatchDataStatus& fetchResult, + void DataReplicator::_onApplierReadyStart(const QueryResponseStatus& fetchResult, NextAction* nextAction) { // Data clone done, move onto apply. TimestampStatus ts(ErrorCodes::OplogStartMissing, ""); @@ -1096,7 +1097,7 @@ std::string toString(DataReplicatorState s) { } } - void DataReplicator::_onMissingFetched(const BatchDataStatus& fetchResult, + void DataReplicator::_onMissingFetched(const QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction, const Operations& ops, const NamespaceString nss) { @@ -1290,7 +1291,7 @@ std::string toString(DataReplicatorState s) { return status; } - void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::BatchData>& fetchResult, + void DataReplicator::_onOplogFetchFinish(const StatusWith<Fetcher::QueryResponse>& fetchResult, Fetcher::NextAction* nextAction) { const Status status = fetchResult.getStatus(); if (status.code() == ErrorCodes::CallbackCanceled) diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index aa347ae7dd2..8f1717db09a 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -54,7 +54,7 @@ class QueryFetcher; namespace repl { using Operations = Applier::Operations; -using BatchDataStatus = StatusWith<Fetcher::BatchData>; +using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>; using CallbackArgs = ReplicationExecutor::CallbackArgs; using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>; using CommandCallbackArgs = ReplicationExecutor::RemoteCommandCallbackArgs; @@ -199,7 +199,7 @@ private: // Only executed via executor void _resumeFinish(CallbackArgs cbData); - void _onOplogFetchFinish(const BatchDataStatus& fetchResult, + void _onOplogFetchFinish(const QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction); void _doNextActions(); void _doNextActions_InitialSync_inlock(); @@ -219,14 +219,14 @@ private: void _handleFailedApplyBatch(const TimestampStatus&, const Operations&); // Fetches the last doc from the first operation, and reschedules the apply for the ops. void _scheduleApplyAfterFetch(const Operations&); - void _onMissingFetched(const BatchDataStatus& fetchResult, + void _onMissingFetched(const QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction, const Operations& ops, const NamespaceString nss); void _onDataClonerFinish(const Status& status); // Called after _onDataClonerFinish when the new Timestamp is avail, to use for minvalid - void _onApplierReadyStart(const BatchDataStatus& fetchResult, + void _onApplierReadyStart(const QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction); Status _scheduleApplyBatch(); diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index dc0f6f8990a..b96ab403169 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -186,7 +186,7 @@ namespace { _startCollectionCloner = startCollectionCloner; } - void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::BatchData>& result, + void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::QueryResponse>& result, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h index 071b33d0e96..f1171bf5e4b 100644 --- a/src/mongo/db/repl/database_cloner.h +++ b/src/mongo/db/repl/database_cloner.h @@ -138,7 +138,7 @@ namespace repl { /** * Read collection names and options from listCollections result. */ - void _listCollectionsCallback(const StatusWith<Fetcher::BatchData>& fetchResult, + void _listCollectionsCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob); diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index ae0edd39146..01efc45aaa4 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -258,7 +258,7 @@ namespace { Status status = Status(ErrorCodes::InternalError, "Internal error running find command"); vector<BSONObj> results; - auto fetcherCallback = [&status, &results](const QueryFetcher::BatchDataStatus& dataStatus, + auto fetcherCallback = [&status, &results](const Fetcher::QueryResponseStatus& dataStatus, Fetcher::NextAction* nextAction) { // Throw out any accumulated results on error |