summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_cloner.h
diff options
context:
space:
mode:
authorVesselina Ratcheva <vesselina.ratcheva@mongodb.com>2020-01-16 23:26:34 +0000
committerevergreen <evergreen@mongodb.com>2020-01-16 23:26:34 +0000
commit8003fc047c5c742829ba80b86ec2e6c5bb4e8453 (patch)
tree9aad28e028559b1832035ec9c40bafb2169f6e1c /src/mongo/db/repl/collection_cloner.h
parent02f3b3a204bdc0e1b157a3839be91dc56f685077 (diff)
downloadmongo-8003fc047c5c742829ba80b86ec2e6c5bb4e8453.tar.gz
SERVER-43276 Implement resume after network error functionality in CollectionCloner query
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.h')
-rw-r--r--src/mongo/db/repl/collection_cloner.h38
1 files changed, 37 insertions, 1 deletions
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 31a469bffbe..764fcefb6dd 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -132,6 +132,18 @@ private:
AfterStageBehavior run() override;
};
+ class CollectionClonerQueryStage : public CollectionClonerStage {
+ public:
+ CollectionClonerQueryStage(std::string name,
+ CollectionCloner* cloner,
+ ClonerRunFn stageFunc)
+ : CollectionClonerStage(name, cloner, stageFunc) {}
+
+ bool isTransientError(const Status& status) override {
+ return ErrorCodes::isRetriableError(status);
+ }
+ };
+
std::string describeForFuzzer(BaseClonerStage* stage) const final {
return _sourceNss.db() + " db: { " + stage->getName() + ": UUID(\"" +
_sourceDbAndUuid.uuid()->toString() + "\") coll: " + _sourceNss.coll() + " }";
@@ -186,6 +198,18 @@ private:
*/
void insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd);
+ /**
+ * Sends a query command to the source. That query command with be parameterized based on
+ * wire version and clone progress.
+ */
+ void runQuery();
+
+ /**
+ * Attempts to clean up the cursor on the upstream node. This is called any time we
+ * receive a transient error during the query stage.
+ */
+ void killOldQueryCursor();
+
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
//
@@ -203,7 +227,7 @@ private:
CollectionClonerStage _countStage; // (R)
CollectionClonerStage _listIndexesStage; // (R)
CollectionClonerStage _createCollectionStage; // (R)
- CollectionClonerStage _queryStage; // (R)
+ CollectionClonerQueryStage _queryStage; // (R)
ProgressMeter _progressMeter; // (X) progress meter for this instance.
std::vector<BSONObj> _indexSpecs; // (X) Except for _id_
@@ -217,6 +241,18 @@ private:
// Putting _dbWorkTaskRunner last ensures anything the database work threads depend on,
// like _documentsToInsert, is destroyed after those threads exit.
TaskRunner _dbWorkTaskRunner; // (R)
+
+ // Does the sync source support resumable queries? (wire version 4.4+)
+ bool _resumeSupported = false; // (X)
+
+ // The resumeToken used to resume after network error.
+ boost::optional<BSONObj> _resumeToken; // (X)
+
+ // The cursorId of the remote collection cursor.
+ long long _remoteCursorId = -1; // (X)
+
+ // If true, it means we are starting a new query or resuming an interrupted one.
+ bool _firstBatchOfQueryRound = true; // (X)
};
} // namespace repl