summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVesselina Ratcheva <vesselina.ratcheva@mongodb.com>2020-01-16 23:26:34 +0000
committerevergreen <evergreen@mongodb.com>2020-01-16 23:26:34 +0000
commit8003fc047c5c742829ba80b86ec2e6c5bb4e8453 (patch)
tree9aad28e028559b1832035ec9c40bafb2169f6e1c
parent02f3b3a204bdc0e1b157a3839be91dc56f685077 (diff)
downloadmongo-8003fc047c5c742829ba80b86ec2e6c5bb4e8453.tar.gz
SERVER-43276 Implement resume after network error functionality in CollectionCloner query
-rw-r--r--src/mongo/client/SConscript11
-rw-r--r--src/mongo/client/dbclient_cursor.cpp2
-rw-r--r--src/mongo/client/dbclient_cursor.h16
-rw-r--r--src/mongo/client/dbclient_mockcursor.cpp95
-rw-r--r--src/mongo/client/dbclient_mockcursor.h40
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp90
-rw-r--r--src/mongo/db/repl/collection_cloner.h38
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp625
-rw-r--r--src/mongo/dbtests/SConscript1
-rw-r--r--src/mongo/dbtests/mock/mock_dbclient_connection.cpp42
10 files changed, 928 insertions, 32 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index be7fdfe56c2..760693b0842 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -374,3 +374,14 @@ env.CppIntegrationTest(
'clientdriver_network',
],
)
+
+env.Library(
+ target='dbclient_mockcursor',
+ source=[
+ 'dbclient_mockcursor.cpp'
+ ],
+ LIBDEPS_PRIVATE=[
+ 'clientdriver_minimal'
+ ],
+)
+
diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp
index 244e893f88b..3c167ee1205 100644
--- a/src/mongo/client/dbclient_cursor.cpp
+++ b/src/mongo/client/dbclient_cursor.cpp
@@ -332,6 +332,8 @@ void DBClientCursor::dataReceived(const Message& reply, bool& retry, string& hos
!(_connectionHasPendingReplies && cursorId == 0));
ns = cr.getNSS(); // Unlike OP_REPLY, find command can change the ns to use for getMores.
+ // Store the resume token, if we got one.
+ _postBatchResumeToken = cr.getPostBatchResumeToken();
batch.objs = cr.releaseBatch();
return;
}
diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h
index b342d432443..b0e1551802b 100644
--- a/src/mongo/client/dbclient_cursor.h
+++ b/src/mongo/client/dbclient_cursor.h
@@ -244,6 +244,13 @@ public:
_lastKnownCommittedOpTime = lastCommittedOpTime;
}
+ /**
+ * Returns the resume token for the latest batch, it set.
+ */
+ virtual boost::optional<BSONObj> getPostBatchResumeToken() const {
+ return _postBatchResumeToken;
+ }
+
protected:
struct Batch {
// TODO remove constructors after c++17 toolchain upgrade
@@ -299,6 +306,7 @@ private:
Milliseconds _awaitDataTimeout = Milliseconds{0};
boost::optional<long long> _term;
boost::optional<repl::OpTime> _lastKnownCommittedOpTime;
+ boost::optional<BSONObj> _postBatchResumeToken;
void dataReceived(const Message& reply) {
bool retry;
@@ -341,6 +349,14 @@ public:
return _c.getNamespaceString();
}
+ const long long getCursorId() const {
+ return _c.getCursorId();
+ }
+
+ boost::optional<BSONObj> getPostBatchResumeToken() const {
+ return _c.getPostBatchResumeToken();
+ }
+
private:
DBClientCursor& _c;
int _n;
diff --git a/src/mongo/client/dbclient_mockcursor.cpp b/src/mongo/client/dbclient_mockcursor.cpp
new file mode 100644
index 00000000000..d70fe3b3d06
--- /dev/null
+++ b/src/mongo/client/dbclient_mockcursor.cpp
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/dbclient_mockcursor.h"
+
+#include "mongo/client/dbclient_cursor.h"
+#include "mongo/util/fail_point.h"
+
+namespace mongo {
+
+MONGO_FAIL_POINT_DEFINE(mockCursorThrowErrorOnGetMore);
+
+DBClientMockCursor::DBClientMockCursor(mongo::DBClientBase* client,
+ const BSONArray& mockCollection,
+ const bool provideResumeToken,
+ unsigned long batchSize)
+ : mongo::DBClientCursor(client, NamespaceString(), 0, 0, 0),
+ _collectionArray(mockCollection),
+ _iter(_collectionArray),
+ _provideResumeToken(provideResumeToken),
+ _batchSize(batchSize) {
+ if (_batchSize)
+ setBatchSize(_batchSize);
+ _fillNextBatch();
+}
+
+bool DBClientMockCursor::more() {
+
+ // Throw if requested via failpoint.
+ mockCursorThrowErrorOnGetMore.execute([&](const BSONObj& data) {
+ auto errorString = data["errorType"].valueStringDataSafe();
+ auto errorCode = ErrorCodes::fromString(errorString);
+
+ std::string message = str::stream()
+ << "mockCursorThrowErrorOnGetMore throwing error for test: " << errorString;
+ uasserted(errorCode, message);
+ });
+
+ if (_batchSize && batch.pos == _batchSize) {
+ _fillNextBatch();
+ }
+ return batch.pos < batch.objs.size();
+}
+
+void DBClientMockCursor::_fillNextBatch() {
+ int leftInBatch = _batchSize;
+ batch.objs.clear();
+ while (_iter.more() && (!_batchSize || leftInBatch--)) {
+ batch.objs.emplace_back(_iter.next().Obj());
+ }
+ batch.pos = 0;
+
+ // Store a mock resume token, if applicable.
+ if (!batch.objs.empty()) {
+ auto lastId = batch.objs.back()["_id"].numberInt();
+ _postBatchResumeToken = BSON("n" << lastId);
+ }
+}
+
+boost::optional<BSONObj> DBClientMockCursor::getPostBatchResumeToken() const {
+ if (!_provideResumeToken) {
+ return boost::none;
+ }
+ return _postBatchResumeToken;
+}
+
+} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/client/dbclient_mockcursor.h b/src/mongo/client/dbclient_mockcursor.h
index ec83b424a7d..11c9181e07c 100644
--- a/src/mongo/client/dbclient_mockcursor.h
+++ b/src/mongo/client/dbclient_mockcursor.h
@@ -29,9 +29,12 @@
#pragma once
+#include "mongo/platform/basic.h"
+
#include "mongo/client/dbclient_cursor.h"
namespace mongo {
+
// DBClientMockCursor supports only a small subset of DBClientCursor operations.
// It supports only iteration, including use of DBClientCursorBatchIterator. If a batchsize
// is given, iteration is broken up into multiple batches at batchSize boundaries.
@@ -39,38 +42,29 @@ class DBClientMockCursor : public DBClientCursor {
public:
DBClientMockCursor(mongo::DBClientBase* client,
const BSONArray& mockCollection,
- unsigned long batchSize = 0)
- : mongo::DBClientCursor(client, NamespaceString(), 0, 0, 0),
- _collectionArray(mockCollection),
- _iter(_collectionArray),
- _batchSize(batchSize) {
- if (_batchSize)
- setBatchSize(_batchSize);
- fillNextBatch();
- }
+ const bool provideResumeToken = false,
+ unsigned long batchSize = 0);
virtual ~DBClientMockCursor() {}
- bool more() override {
- if (_batchSize && batch.pos == _batchSize) {
- fillNextBatch();
- }
- return batch.pos < batch.objs.size();
- }
+ bool more() override;
+
+ // Override to return a mock resume token.
+ // The format of the token is simply {n: <idOfLastDocReturned>}.
+ boost::optional<BSONObj> getPostBatchResumeToken() const override;
private:
- void fillNextBatch() {
- int leftInBatch = _batchSize;
- batch.objs.clear();
- while (_iter.more() && (!_batchSize || leftInBatch--)) {
- batch.objs.emplace_back(_iter.next().Obj());
- }
- batch.pos = 0;
- }
+ void _fillNextBatch();
+
// The BSONObjIterator expects the underlying BSONObj to stay in scope while the
// iterator is in use, so we store it here.
BSONArray _collectionArray;
BSONObjIterator _iter;
+
+ // A simple mock resume token that contains the id of the last document returned.
+ boost::optional<BSONObj> _postBatchResumeToken;
+ bool _provideResumeToken = false;
+
unsigned long _batchSize;
// non-copyable , non-assignable
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index c84f9940452..46560776f13 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner_gen.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
+#include "mongo/db/wire_version.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
@@ -91,6 +92,9 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss,
invariant(collectionOptions.uuid);
_sourceDbAndUuid = NamespaceStringOrUUID(sourceNss.db().toString(), *collectionOptions.uuid);
_stats.ns = _sourceNss.ns();
+
+ // Find out whether the sync source supports resumable queries.
+ _resumeSupported = (getClient()->getMaxWireVersion() == WireVersion::PLACEHOLDER_FOR_44);
}
BaseCloner::ClonerStages CollectionCloner::getStages() {
@@ -179,13 +183,9 @@ BaseCloner::AfterStageBehavior CollectionCloner::createCollectionStage() {
}
BaseCloner::AfterStageBehavior CollectionCloner::queryStage() {
- getClient()->query([this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); },
- _sourceDbAndUuid,
- QUERY("query" << BSONObj() << "$readOnce" << true),
- nullptr /* fieldsToReturn */,
- QueryOption_NoCursorTimeout | QueryOption_SlaveOk |
- (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0),
- _collectionClonerBatchSize);
+ // Attempt to clean up cursor from the last retry (if applicable).
+ killOldQueryCursor();
+ runQuery();
waitForDatabaseWorkToComplete();
// We want to free the _collLoader regardless of whether the commit succeeds.
std::unique_ptr<CollectionBulkLoader> loader = std::move(_collLoader);
@@ -193,6 +193,49 @@ BaseCloner::AfterStageBehavior CollectionCloner::queryStage() {
return kContinueNormally;
}
+void CollectionCloner::runQuery() {
+ // Non-resumable query.
+ Query query = QUERY("query" << BSONObj() << "$readOnce" << true);
+
+ if (_resumeSupported) {
+ if (_resumeToken) {
+ // Resume the query from where we left off.
+ query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken"
+ << true << "$_resumeAfter" << _resumeToken.get());
+ } else {
+ // New attempt at a resumable query.
+ query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken"
+ << true);
+ }
+ }
+
+ // We reset this every time we retry or resume a query.
+ // We distinguish the first batch from the rest so that we only store the remote cursor id
+ // the first time we get it.
+ _firstBatchOfQueryRound = true;
+
+ try {
+ getClient()->query([this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); },
+ _sourceDbAndUuid,
+ query,
+ nullptr /* fieldsToReturn */,
+ QueryOption_NoCursorTimeout | QueryOption_SlaveOk |
+ (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0),
+ _collectionClonerBatchSize);
+ } catch (...) {
+ auto status = exceptionToStatus();
+
+ if (!_resumeSupported) {
+ std::string message = str::stream()
+ << "Collection clone failed and is not resumable. nss: " << _sourceNss;
+ log() << message;
+ uasserted(ErrorCodes::InitialSyncFailure, message);
+ }
+
+ throw;
+ }
+}
+
void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
{
stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData());
@@ -205,6 +248,12 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
}
}
+ if (_firstBatchOfQueryRound && _resumeSupported) {
+ // Store the cursorId of the remote cursor.
+ _remoteCursorId = iter.getCursorId();
+ }
+ _firstBatchOfQueryRound = false;
+
{
stdx::lock_guard<Latch> lk(_mutex);
_stats.receivedBatches++;
@@ -224,6 +273,11 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
uassertStatusOK(newStatus);
}
+ if (_resumeSupported) {
+ // Store the resume token for this batch.
+ _resumeToken = iter.getPostBatchResumeToken();
+ }
+
initialSyncHangCollectionClonerAfterHandlingBatchResponse.executeIf(
[&](const BSONObj&) {
while (MONGO_unlikely(
@@ -288,6 +342,28 @@ void CollectionCloner::waitForDatabaseWorkToComplete() {
_dbWorkTaskRunner.join();
}
+void CollectionCloner::killOldQueryCursor() {
+ // No cursor stored. Do nothing.
+ if (_remoteCursorId == -1) {
+ return;
+ }
+
+ BSONObj infoObj;
+ auto nss = _sourceNss;
+ auto id = _remoteCursorId;
+
+ auto cmdObj = BSON("killCursors" << nss.coll() << "cursors" << BSON_ARRAY(id));
+
+ try {
+ getClient()->runCommand(nss.db().toString(), cmdObj, infoObj);
+ } catch (...) {
+ log() << "Error while trying to kill remote cursor after transient query error";
+ }
+
+ // Clear the stored cursorId on success.
+ _remoteCursorId = -1;
+}
+
CollectionCloner::Stats CollectionCloner::getStats() const {
stdx::lock_guard<Latch> lk(_mutex);
return _stats;
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 31a469bffbe..764fcefb6dd 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -132,6 +132,18 @@ private:
AfterStageBehavior run() override;
};
+ class CollectionClonerQueryStage : public CollectionClonerStage {
+ public:
+ CollectionClonerQueryStage(std::string name,
+ CollectionCloner* cloner,
+ ClonerRunFn stageFunc)
+ : CollectionClonerStage(name, cloner, stageFunc) {}
+
+ bool isTransientError(const Status& status) override {
+ return ErrorCodes::isRetriableError(status);
+ }
+ };
+
std::string describeForFuzzer(BaseClonerStage* stage) const final {
return _sourceNss.db() + " db: { " + stage->getName() + ": UUID(\"" +
_sourceDbAndUuid.uuid()->toString() + "\") coll: " + _sourceNss.coll() + " }";
@@ -186,6 +198,18 @@ private:
*/
void insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd);
+ /**
+ * Sends a query command to the source. That query command with be parameterized based on
+ * wire version and clone progress.
+ */
+ void runQuery();
+
+ /**
+ * Attempts to clean up the cursor on the upstream node. This is called any time we
+ * receive a transient error during the query stage.
+ */
+ void killOldQueryCursor();
+
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
//
@@ -203,7 +227,7 @@ private:
CollectionClonerStage _countStage; // (R)
CollectionClonerStage _listIndexesStage; // (R)
CollectionClonerStage _createCollectionStage; // (R)
- CollectionClonerStage _queryStage; // (R)
+ CollectionClonerQueryStage _queryStage; // (R)
ProgressMeter _progressMeter; // (X) progress meter for this instance.
std::vector<BSONObj> _indexSpecs; // (X) Except for _id_
@@ -217,6 +241,18 @@ private:
// Putting _dbWorkTaskRunner last ensures anything the database work threads depend on,
// like _documentsToInsert, is destroyed after those threads exit.
TaskRunner _dbWorkTaskRunner; // (R)
+
+ // Does the sync source support resumable queries? (wire version 4.4+)
+ bool _resumeSupported = false; // (X)
+
+ // The resumeToken used to resume after network error.
+ boost::optional<BSONObj> _resumeToken; // (X)
+
+ // The cursorId of the remote collection cursor.
+ long long _remoteCursorId = -1; // (X)
+
+ // If true, it means we are starting a new query or resuming an interrupted one.
+ bool _firstBatchOfQueryRound = true; // (X)
};
} // namespace repl
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 78ec583b37a..b6c4ce672d8 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -78,6 +78,8 @@ protected:
};
_storageInterface.createCollectionForBulkFn = _standardCreateCollectionFn;
+ _mockClient->setWireVersions(WireVersion::PLACEHOLDER_FOR_44,
+ WireVersion::PLACEHOLDER_FOR_44);
_mockServer->assignCollectionUuid(_nss.ns(), _collUuid);
_mockServer->setCommandReply("replSetGetRBID",
BSON("ok" << 1 << "rbid" << _sharedData->getRollBackId()));
@@ -480,5 +482,628 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
ASSERT_EQ(collNss, _nss);
}
+TEST_F(CollectionClonerTest, NonResumableQuerySuccess) {
+ // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
+ _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
+ WireVersion::SHARDED_TRANSACTIONS);
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+
+ ASSERT_OK(cloner->run());
+
+ ASSERT_EQUALS(3, _collectionStats->insertCount);
+ ASSERT_TRUE(_collectionStats->commitCalled);
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(3u, stats.documentsCopied);
+}
+
+TEST_F(CollectionClonerTest, NonResumableQueryFailure) {
+ // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
+ _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
+ WireVersion::SHARDED_TRANSACTIONS);
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ auto status = cloner->run();
+ ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status);
+ ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable");
+ });
+
+ // Wait until we get to the query stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // Bring the server down.
+ _mockServer->shutdown();
+
+ // Let us begin with the query stage.
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+
+ clonerThread.join();
+}
+
+// We will retry our query without having yet obtained a resume token.
+TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyBeforeFirstBatchRetrySuccess) {
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ // Preliminary setup for failpoints.
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage");
+ auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ auto cloner = makeCollectionCloner();
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_OK(cloner->run());
+ });
+
+ // Wait until we get to the query stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // Bring the server down. We will fail right before our first batch.
+ _mockServer->shutdown();
+
+ // Let the cloner retry and wait until just before it.
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry);
+
+ // Verify we haven't been able to receive anything yet.
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(0, stats.receivedBatches);
+
+ // Bring the server back up.
+ _mockServer->reboot();
+
+ // Let the retry commence.
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+
+ clonerThread.join();
+
+ // Check that we've received all the data.
+ ASSERT_EQUALS(3, _collectionStats->insertCount);
+ ASSERT_TRUE(_collectionStats->commitCalled);
+ stats = cloner->getStats();
+ ASSERT_EQUALS(3u, stats.documentsCopied);
+}
+
+// We will resume our query using the resume token we stored after receiving the first batch.
+TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyAfterFirstBatchRetrySuccess) {
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("killCursors", fromjson("{ok:1}"));
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(5));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 4));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 5));
+
+ // Preliminary setup for hanging failpoint.
+ auto afterBatchFailpoint =
+ globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse");
+ auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0);
+
+ auto cloner = makeCollectionCloner();
+ cloner->setBatchSize_forTest(2);
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_OK(cloner->run());
+ });
+
+ // Wait for us to process the first batch.
+ afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1);
+
+ // Verify we've only managed to store one batch.
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(1, stats.receivedBatches);
+
+ // This will cause the next batch to fail once (transiently).
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}"));
+
+ // Let the query stage finish.
+ afterBatchFailpoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // Since the CollectionMockStats class does not de-duplicate inserts, it is possible to insert
+ // the same document more than once, thereby also increasing the insertCount more than once.
+ // This means that here insertCount=5 is evidence that we correctly resumed our query where we
+ // left off (2 inserts in) instead of retrying the whole query (that leads to insertCount=7).
+ ASSERT_EQUALS(5, _collectionStats->insertCount);
+ ASSERT_TRUE(_collectionStats->commitCalled);
+ stats = cloner->getStats();
+ ASSERT_EQUALS(5u, stats.documentsCopied);
+}
+
+TEST_F(CollectionClonerTest, ResumableQueryNonRetriableError) {
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("killCursors", fromjson("{ok:1}"));
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ auto status = cloner->run();
+ ASSERT_EQUALS(ErrorCodes::UnknownError, status);
+ });
+
+ // Wait until we get to the query stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // Verify we've made no progress yet.
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(0, stats.receivedBatches);
+
+ // This will cause the next batch to fail once, non-transiently.
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}"));
+
+ // Let us begin with the query stage.
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+
+ clonerThread.join();
+}
+
+TEST_F(CollectionClonerTest, ResumableQueryFailNonTransientlyAfterProgressMadeCannotRetry) {
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("killCursors", fromjson("{ok:1}"));
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+ cloner->setBatchSize_forTest(2);
+ auto afterBatchFailpoint =
+ globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse");
+ auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0);
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ auto status = cloner->run();
+ ASSERT_EQUALS(ErrorCodes::UnknownError, status);
+ });
+
+ afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1);
+
+ // Verify we've only managed to store one batch.
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(1, stats.receivedBatches);
+
+ // This will cause the next batch to fail once, non-transiently.
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}"));
+
+ // Allow the cloner to attempt (and fail) the next batch.
+ afterBatchFailpoint->setMode(FailPoint::off, 0);
+
+ clonerThread.join();
+}
+
+TEST_F(CollectionClonerTest, ResumableQueryKillCursorsNetworkError) {
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ // Preliminary setup for hanging failpoint.
+ auto afterBatchFailpoint =
+ globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse");
+ auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0);
+
+ auto cloner = makeCollectionCloner();
+ cloner->setBatchSize_forTest(2);
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_OK(cloner->run());
+ });
+
+ // Wait for us to process the first batch.
+ afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1);
+
+ // Verify we've only managed to store one batch.
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(1, stats.receivedBatches);
+
+ // This will cause the next batch to fail once (transiently).
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}"));
+
+ // Prepare the network error response to 'killCursors'.
+ // This will cause us to retry the query stage.
+ _mockServer->setCommandReply("killCursors",
+ Status{ErrorCodes::HostUnreachable, "HostUnreachable for test"});
+
+ // Let the query stage finish.
+ afterBatchFailpoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ ASSERT_EQUALS(3, _collectionStats->insertCount);
+ ASSERT_TRUE(_collectionStats->commitCalled);
+ stats = cloner->getStats();
+ ASSERT_EQUALS(3u, stats.documentsCopied);
+}
+
+TEST_F(CollectionClonerTest, ResumableQueryKillCursorsOtherError) {
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ // Preliminary setup for hanging failpoint.
+ auto afterBatchFailpoint =
+ globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse");
+ auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0);
+
+ auto cloner = makeCollectionCloner();
+ cloner->setBatchSize_forTest(2);
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_OK(cloner->run());
+ });
+
+ // Wait for us to process the first batch.
+ afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1);
+
+ // Verify we've only managed to store one batch.
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(1, stats.receivedBatches);
+
+ // This will cause the next batch to fail once (transiently).
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}"));
+
+ // Prepare the network error response to 'killCursors'.
+ // This will cause us to retry the whole query.
+ _mockServer->setCommandReply("killCursors",
+ Status{ErrorCodes::UnknownError, "UnknownError for test"});
+
+ // Let the query stage finish.
+ afterBatchFailpoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // We ignored the 'killCursors' request and just resumed the query. We should have cloned every
+ // document exactly once.
+ ASSERT_EQUALS(3, _collectionStats->insertCount);
+ ASSERT_TRUE(_collectionStats->commitCalled);
+ stats = cloner->getStats();
+ ASSERT_EQUALS(3u, stats.documentsCopied);
+}
+
+// We retry the query after a transient error and we immediately encounter a non-retriable one.
+TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAtRetry) {
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("killCursors", fromjson("{ok:1}"));
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(5));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ // Preliminary setup for hanging failpoints.
+ auto afterBatchFailpoint =
+ globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse");
+ auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0);
+
+ auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage");
+ auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ auto cloner = makeCollectionCloner();
+ cloner->setBatchSize_forTest(2);
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ auto status = cloner->run();
+ ASSERT_EQUALS(ErrorCodes::UnknownError, status);
+ });
+
+ // Wait for us to process the first batch.
+ afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1);
+
+ // Verify we've only managed to store one batch.
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(1, stats.receivedBatches);
+
+ // This will cause the next batch to fail once (transiently).
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}"));
+
+ afterBatchFailpoint->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1);
+
+ // Follow-up the transient error with a non-retriable one.
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}"));
+
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // We only made it one batch in before failing.
+ stats = cloner->getStats();
+ ASSERT_EQUALS(1u, stats.receivedBatches);
+}
+
+// We retry the query after a transient error, we make a bit more progress and then we encounter
+// a non-retriable one.
+TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAfterPastRetry) {
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("killCursors", fromjson("{ok:1}"));
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(5));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 4));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 5));
+
+ // Preliminary setup for hanging failpoints.
+ auto afterBatchFailpoint =
+ globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse");
+ auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0);
+
+ auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage");
+ auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ auto cloner = makeCollectionCloner();
+ cloner->setBatchSize_forTest(2);
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ auto status = cloner->run();
+ ASSERT_EQUALS(ErrorCodes::UnknownError, status);
+ });
+
+ // Wait for us to process the first batch.
+ afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1);
+
+ // Verify we've only managed to store one batch.
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(1, stats.receivedBatches);
+
+ // This will cause the next batch to fail once (transiently).
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}"));
+
+ afterBatchFailpoint->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1);
+
+ // Do a quick failpoint dance so we clone one more batch before failing.
+ timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0);
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+ afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1);
+
+ // Follow-up the transient error with a non-retriable one.
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}"));
+
+ afterBatchFailpoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ // We only made it one batch in before failing.
+ stats = cloner->getStats();
+ ASSERT_EQUALS(2u, stats.receivedBatches);
+}
+
+// We resume a query, receive some more data, then get a transient error again. The goal of this
+// test is to make sure we don't forget to request the _next_ resume token when resuming a query.
+TEST_F(CollectionClonerTest, ResumableQueryTwoResumes) {
+
+ /**
+ * Test runs like so:
+ *
+ * |___batch___| . |___batch___| |___batch___| . |batch|
+ * | |
+ * resume 1 resume 2
+ */
+
+ _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}"));
+ _mockServer->setCommandReply("killCursors", fromjson("{ok:1}"));
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(5));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 4));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 5));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 6));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 7));
+
+ // Preliminary setup for hanging failpoints.
+ auto afterBatchFailpoint =
+ globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse");
+ auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0);
+
+ auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage");
+ auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ auto cloner = makeCollectionCloner();
+ cloner->setBatchSize_forTest(2);
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_OK(cloner->run());
+ });
+
+ // Wait for us to process the first batch.
+ afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1);
+
+ // Verify we've only managed to store one batch.
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(1, stats.receivedBatches);
+
+ // This will cause the next batch to fail once (transiently).
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}"));
+
+ afterBatchFailpoint->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1);
+
+ // Allow copying two more batches before the next error.
+ // It is important that the resumes come after differing amounts of progress, so that we can
+ // more easily distinguish error scenarios based on document count. (see end of test)
+ failNextBatch->setMode(FailPoint::skip, 2, fromjson("{errorType: 'HostUnreachable'}"));
+
+ // Do a failpoint dance so we can get to the next retry.
+ timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0);
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+ afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1);
+ timesEnteredBeforeRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+ afterBatchFailpoint->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1);
+
+ // Allow the clone to finish.
+ failNextBatch->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+
+ clonerThread.join();
+
+ /**
+ * Since the CollectionMockStats class does not de-duplicate inserts, it is possible to insert
+ * the same document more than once, thereby also increasing the insertCount more than once.
+ * We can therefore infer the resume history from the insertCount. In this test:
+ * - insertCount = 7: all the resumes were correct and we got every doc exactly once
+ * - this is the correct result
+ * - insertCount = 9: the first resume retried instead of resuming (second resume was correct)
+ * - insertCount = 11: the second resume used the first resume token instead of the second one
+ * - we test that we avoid this result
+ * - insertCount = 13: the second resume retried instead of resuming (first one was correct)
+ */
+
+ ASSERT_EQUALS(7, _collectionStats->insertCount);
+ ASSERT_TRUE(_collectionStats->commitCalled);
+ stats = cloner->getStats();
+ ASSERT_EQUALS(7u, stats.documentsCopied);
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index 0e440b47168..c867767b612 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -53,6 +53,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/clientdriver_network',
+ '$BUILD_DIR/mongo/client/dbclient_mockcursor',
'$BUILD_DIR/mongo/db/repl/replica_set_messages'
],
)
diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
index 23f5af622eb..79afb8861f8 100644
--- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
+++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp
@@ -112,8 +112,48 @@ std::unique_ptr<mongo::DBClientCursor> MockDBClientConnection::query(
queryOptions,
batchSize));
+ BSONArray resultsInCursor;
+
+ // A simple mock implementation of a resumable query, where we skip the first 'n' fields
+ // where 'n' is given by the mock resume token.
+ auto nToSkip = 0;
+ auto queryBson = fromjson(query.toString());
+ if (queryBson.hasField("$_resumeAfter")) {
+ if (queryBson["$_resumeAfter"].Obj().hasField("n")) {
+ nToSkip = queryBson["$_resumeAfter"]["n"].numberInt();
+ }
+ }
+
+ bool provideResumeToken = false;
+ if (queryBson.hasField("$_requestResumeToken")) {
+ provideResumeToken = true;
+ }
+
+ // Resume query.
+ if (nToSkip != 0) {
+ BSONObjIterator iter(result);
+ BSONArrayBuilder builder;
+ auto numExamined = 0;
+
+ while (iter.more()) {
+ numExamined++;
+
+ if (numExamined < nToSkip + 1) {
+ iter.next();
+ continue;
+ }
+
+ builder.append(iter.next().Obj());
+ }
+ resultsInCursor = BSONArray(builder.obj());
+ } else {
+ // Yield all results instead (default).
+ resultsInCursor = BSONArray(result.copy());
+ }
+
std::unique_ptr<mongo::DBClientCursor> cursor;
- cursor.reset(new DBClientMockCursor(this, BSONArray(result.copy()), batchSize));
+ cursor.reset(new DBClientMockCursor(
+ this, BSONArray(resultsInCursor), provideResumeToken, batchSize));
return cursor;
} catch (const mongo::DBException&) {
_isFailed = true;