diff options
author | Vesselina Ratcheva <vesselina.ratcheva@mongodb.com> | 2020-01-16 23:26:34 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-01-16 23:26:34 +0000 |
commit | 8003fc047c5c742829ba80b86ec2e6c5bb4e8453 (patch) | |
tree | 9aad28e028559b1832035ec9c40bafb2169f6e1c | |
parent | 02f3b3a204bdc0e1b157a3839be91dc56f685077 (diff) | |
download | mongo-8003fc047c5c742829ba80b86ec2e6c5bb4e8453.tar.gz |
SERVER-43276 Implement resume after network error functionality in CollectionCloner query
-rw-r--r-- | src/mongo/client/SConscript | 11 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor.cpp | 2 | ||||
-rw-r--r-- | src/mongo/client/dbclient_cursor.h | 16 | ||||
-rw-r--r-- | src/mongo/client/dbclient_mockcursor.cpp | 95 | ||||
-rw-r--r-- | src/mongo/client/dbclient_mockcursor.h | 40 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 90 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 38 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 625 | ||||
-rw-r--r-- | src/mongo/dbtests/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/dbtests/mock/mock_dbclient_connection.cpp | 42 |
10 files changed, 928 insertions, 32 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index be7fdfe56c2..760693b0842 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -374,3 +374,14 @@ env.CppIntegrationTest( 'clientdriver_network', ], ) + +env.Library( + target='dbclient_mockcursor', + source=[ + 'dbclient_mockcursor.cpp' + ], + LIBDEPS_PRIVATE=[ + 'clientdriver_minimal' + ], +) + diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp index 244e893f88b..3c167ee1205 100644 --- a/src/mongo/client/dbclient_cursor.cpp +++ b/src/mongo/client/dbclient_cursor.cpp @@ -332,6 +332,8 @@ void DBClientCursor::dataReceived(const Message& reply, bool& retry, string& hos !(_connectionHasPendingReplies && cursorId == 0)); ns = cr.getNSS(); // Unlike OP_REPLY, find command can change the ns to use for getMores. + // Store the resume token, if we got one. + _postBatchResumeToken = cr.getPostBatchResumeToken(); batch.objs = cr.releaseBatch(); return; } diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h index b342d432443..b0e1551802b 100644 --- a/src/mongo/client/dbclient_cursor.h +++ b/src/mongo/client/dbclient_cursor.h @@ -244,6 +244,13 @@ public: _lastKnownCommittedOpTime = lastCommittedOpTime; } + /** + * Returns the resume token for the latest batch, it set. + */ + virtual boost::optional<BSONObj> getPostBatchResumeToken() const { + return _postBatchResumeToken; + } + protected: struct Batch { // TODO remove constructors after c++17 toolchain upgrade @@ -299,6 +306,7 @@ private: Milliseconds _awaitDataTimeout = Milliseconds{0}; boost::optional<long long> _term; boost::optional<repl::OpTime> _lastKnownCommittedOpTime; + boost::optional<BSONObj> _postBatchResumeToken; void dataReceived(const Message& reply) { bool retry; @@ -341,6 +349,14 @@ public: return _c.getNamespaceString(); } + const long long getCursorId() const { + return _c.getCursorId(); + } + + boost::optional<BSONObj> getPostBatchResumeToken() const { + return _c.getPostBatchResumeToken(); + } + private: DBClientCursor& _c; int _n; diff --git a/src/mongo/client/dbclient_mockcursor.cpp b/src/mongo/client/dbclient_mockcursor.cpp new file mode 100644 index 00000000000..d70fe3b3d06 --- /dev/null +++ b/src/mongo/client/dbclient_mockcursor.cpp @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/dbclient_mockcursor.h" + +#include "mongo/client/dbclient_cursor.h" +#include "mongo/util/fail_point.h" + +namespace mongo { + +MONGO_FAIL_POINT_DEFINE(mockCursorThrowErrorOnGetMore); + +DBClientMockCursor::DBClientMockCursor(mongo::DBClientBase* client, + const BSONArray& mockCollection, + const bool provideResumeToken, + unsigned long batchSize) + : mongo::DBClientCursor(client, NamespaceString(), 0, 0, 0), + _collectionArray(mockCollection), + _iter(_collectionArray), + _provideResumeToken(provideResumeToken), + _batchSize(batchSize) { + if (_batchSize) + setBatchSize(_batchSize); + _fillNextBatch(); +} + +bool DBClientMockCursor::more() { + + // Throw if requested via failpoint. + mockCursorThrowErrorOnGetMore.execute([&](const BSONObj& data) { + auto errorString = data["errorType"].valueStringDataSafe(); + auto errorCode = ErrorCodes::fromString(errorString); + + std::string message = str::stream() + << "mockCursorThrowErrorOnGetMore throwing error for test: " << errorString; + uasserted(errorCode, message); + }); + + if (_batchSize && batch.pos == _batchSize) { + _fillNextBatch(); + } + return batch.pos < batch.objs.size(); +} + +void DBClientMockCursor::_fillNextBatch() { + int leftInBatch = _batchSize; + batch.objs.clear(); + while (_iter.more() && (!_batchSize || leftInBatch--)) { + batch.objs.emplace_back(_iter.next().Obj()); + } + batch.pos = 0; + + // Store a mock resume token, if applicable. + if (!batch.objs.empty()) { + auto lastId = batch.objs.back()["_id"].numberInt(); + _postBatchResumeToken = BSON("n" << lastId); + } +} + +boost::optional<BSONObj> DBClientMockCursor::getPostBatchResumeToken() const { + if (!_provideResumeToken) { + return boost::none; + } + return _postBatchResumeToken; +} + +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/client/dbclient_mockcursor.h b/src/mongo/client/dbclient_mockcursor.h index ec83b424a7d..11c9181e07c 100644 --- a/src/mongo/client/dbclient_mockcursor.h +++ b/src/mongo/client/dbclient_mockcursor.h @@ -29,9 +29,12 @@ #pragma once +#include "mongo/platform/basic.h" + #include "mongo/client/dbclient_cursor.h" namespace mongo { + // DBClientMockCursor supports only a small subset of DBClientCursor operations. // It supports only iteration, including use of DBClientCursorBatchIterator. If a batchsize // is given, iteration is broken up into multiple batches at batchSize boundaries. @@ -39,38 +42,29 @@ class DBClientMockCursor : public DBClientCursor { public: DBClientMockCursor(mongo::DBClientBase* client, const BSONArray& mockCollection, - unsigned long batchSize = 0) - : mongo::DBClientCursor(client, NamespaceString(), 0, 0, 0), - _collectionArray(mockCollection), - _iter(_collectionArray), - _batchSize(batchSize) { - if (_batchSize) - setBatchSize(_batchSize); - fillNextBatch(); - } + const bool provideResumeToken = false, + unsigned long batchSize = 0); virtual ~DBClientMockCursor() {} - bool more() override { - if (_batchSize && batch.pos == _batchSize) { - fillNextBatch(); - } - return batch.pos < batch.objs.size(); - } + bool more() override; + + // Override to return a mock resume token. + // The format of the token is simply {n: <idOfLastDocReturned>}. + boost::optional<BSONObj> getPostBatchResumeToken() const override; private: - void fillNextBatch() { - int leftInBatch = _batchSize; - batch.objs.clear(); - while (_iter.more() && (!_batchSize || leftInBatch--)) { - batch.objs.emplace_back(_iter.next().Obj()); - } - batch.pos = 0; - } + void _fillNextBatch(); + // The BSONObjIterator expects the underlying BSONObj to stay in scope while the // iterator is in use, so we store it here. BSONArray _collectionArray; BSONObjIterator _iter; + + // A simple mock resume token that contains the id of the last document returned. + boost::optional<BSONObj> _postBatchResumeToken; + bool _provideResumeToken = false; + unsigned long _batchSize; // non-copyable , non-assignable diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index c84f9940452..46560776f13 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner_gen.h" #include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/db/wire_version.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -91,6 +92,9 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss, invariant(collectionOptions.uuid); _sourceDbAndUuid = NamespaceStringOrUUID(sourceNss.db().toString(), *collectionOptions.uuid); _stats.ns = _sourceNss.ns(); + + // Find out whether the sync source supports resumable queries. + _resumeSupported = (getClient()->getMaxWireVersion() == WireVersion::PLACEHOLDER_FOR_44); } BaseCloner::ClonerStages CollectionCloner::getStages() { @@ -179,13 +183,9 @@ BaseCloner::AfterStageBehavior CollectionCloner::createCollectionStage() { } BaseCloner::AfterStageBehavior CollectionCloner::queryStage() { - getClient()->query([this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); }, - _sourceDbAndUuid, - QUERY("query" << BSONObj() << "$readOnce" << true), - nullptr /* fieldsToReturn */, - QueryOption_NoCursorTimeout | QueryOption_SlaveOk | - (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0), - _collectionClonerBatchSize); + // Attempt to clean up cursor from the last retry (if applicable). + killOldQueryCursor(); + runQuery(); waitForDatabaseWorkToComplete(); // We want to free the _collLoader regardless of whether the commit succeeds. std::unique_ptr<CollectionBulkLoader> loader = std::move(_collLoader); @@ -193,6 +193,49 @@ BaseCloner::AfterStageBehavior CollectionCloner::queryStage() { return kContinueNormally; } +void CollectionCloner::runQuery() { + // Non-resumable query. + Query query = QUERY("query" << BSONObj() << "$readOnce" << true); + + if (_resumeSupported) { + if (_resumeToken) { + // Resume the query from where we left off. + query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken" + << true << "$_resumeAfter" << _resumeToken.get()); + } else { + // New attempt at a resumable query. + query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken" + << true); + } + } + + // We reset this every time we retry or resume a query. + // We distinguish the first batch from the rest so that we only store the remote cursor id + // the first time we get it. + _firstBatchOfQueryRound = true; + + try { + getClient()->query([this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); }, + _sourceDbAndUuid, + query, + nullptr /* fieldsToReturn */, + QueryOption_NoCursorTimeout | QueryOption_SlaveOk | + (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0), + _collectionClonerBatchSize); + } catch (...) { + auto status = exceptionToStatus(); + + if (!_resumeSupported) { + std::string message = str::stream() + << "Collection clone failed and is not resumable. nss: " << _sourceNss; + log() << message; + uasserted(ErrorCodes::InitialSyncFailure, message); + } + + throw; + } +} + void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { { stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); @@ -205,6 +248,12 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { } } + if (_firstBatchOfQueryRound && _resumeSupported) { + // Store the cursorId of the remote cursor. + _remoteCursorId = iter.getCursorId(); + } + _firstBatchOfQueryRound = false; + { stdx::lock_guard<Latch> lk(_mutex); _stats.receivedBatches++; @@ -224,6 +273,11 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { uassertStatusOK(newStatus); } + if (_resumeSupported) { + // Store the resume token for this batch. + _resumeToken = iter.getPostBatchResumeToken(); + } + initialSyncHangCollectionClonerAfterHandlingBatchResponse.executeIf( [&](const BSONObj&) { while (MONGO_unlikely( @@ -288,6 +342,28 @@ void CollectionCloner::waitForDatabaseWorkToComplete() { _dbWorkTaskRunner.join(); } +void CollectionCloner::killOldQueryCursor() { + // No cursor stored. Do nothing. + if (_remoteCursorId == -1) { + return; + } + + BSONObj infoObj; + auto nss = _sourceNss; + auto id = _remoteCursorId; + + auto cmdObj = BSON("killCursors" << nss.coll() << "cursors" << BSON_ARRAY(id)); + + try { + getClient()->runCommand(nss.db().toString(), cmdObj, infoObj); + } catch (...) { + log() << "Error while trying to kill remote cursor after transient query error"; + } + + // Clear the stored cursorId on success. + _remoteCursorId = -1; +} + CollectionCloner::Stats CollectionCloner::getStats() const { stdx::lock_guard<Latch> lk(_mutex); return _stats; diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 31a469bffbe..764fcefb6dd 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -132,6 +132,18 @@ private: AfterStageBehavior run() override; }; + class CollectionClonerQueryStage : public CollectionClonerStage { + public: + CollectionClonerQueryStage(std::string name, + CollectionCloner* cloner, + ClonerRunFn stageFunc) + : CollectionClonerStage(name, cloner, stageFunc) {} + + bool isTransientError(const Status& status) override { + return ErrorCodes::isRetriableError(status); + } + }; + std::string describeForFuzzer(BaseClonerStage* stage) const final { return _sourceNss.db() + " db: { " + stage->getName() + ": UUID(\"" + _sourceDbAndUuid.uuid()->toString() + "\") coll: " + _sourceNss.coll() + " }"; @@ -186,6 +198,18 @@ private: */ void insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd); + /** + * Sends a query command to the source. That query command with be parameterized based on + * wire version and clone progress. + */ + void runQuery(); + + /** + * Attempts to clean up the cursor on the upstream node. This is called any time we + * receive a transient error during the query stage. + */ + void killOldQueryCursor(); + // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. // @@ -203,7 +227,7 @@ private: CollectionClonerStage _countStage; // (R) CollectionClonerStage _listIndexesStage; // (R) CollectionClonerStage _createCollectionStage; // (R) - CollectionClonerStage _queryStage; // (R) + CollectionClonerQueryStage _queryStage; // (R) ProgressMeter _progressMeter; // (X) progress meter for this instance. std::vector<BSONObj> _indexSpecs; // (X) Except for _id_ @@ -217,6 +241,18 @@ private: // Putting _dbWorkTaskRunner last ensures anything the database work threads depend on, // like _documentsToInsert, is destroyed after those threads exit. TaskRunner _dbWorkTaskRunner; // (R) + + // Does the sync source support resumable queries? (wire version 4.4+) + bool _resumeSupported = false; // (X) + + // The resumeToken used to resume after network error. + boost::optional<BSONObj> _resumeToken; // (X) + + // The cursorId of the remote collection cursor. + long long _remoteCursorId = -1; // (X) + + // If true, it means we are starting a new query or resuming an interrupted one. + bool _firstBatchOfQueryRound = true; // (X) }; } // namespace repl diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 78ec583b37a..b6c4ce672d8 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -78,6 +78,8 @@ protected: }; _storageInterface.createCollectionForBulkFn = _standardCreateCollectionFn; + _mockClient->setWireVersions(WireVersion::PLACEHOLDER_FOR_44, + WireVersion::PLACEHOLDER_FOR_44); _mockServer->assignCollectionUuid(_nss.ns(), _collUuid); _mockServer->setCommandReply("replSetGetRBID", BSON("ok" << 1 << "rbid" << _sharedData->getRollBackId())); @@ -480,5 +482,628 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { ASSERT_EQ(collNss, _nss); } +TEST_F(CollectionClonerTest, NonResumableQuerySuccess) { + // Set client wireVersion to 4.2, where we do not yet support resumable cloning. + _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, + WireVersion::SHARDED_TRANSACTIONS); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + + ASSERT_OK(cloner->run()); + + ASSERT_EQUALS(3, _collectionStats->insertCount); + ASSERT_TRUE(_collectionStats->commitCalled); + auto stats = cloner->getStats(); + ASSERT_EQUALS(3u, stats.documentsCopied); +} + +TEST_F(CollectionClonerTest, NonResumableQueryFailure) { + // Set client wireVersion to 4.2, where we do not yet support resumable cloning. + _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, + WireVersion::SHARDED_TRANSACTIONS); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + auto status = cloner->run(); + ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status); + ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable"); + }); + + // Wait until we get to the query stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Bring the server down. + _mockServer->shutdown(); + + // Let us begin with the query stage. + beforeStageFailPoint->setMode(FailPoint::off, 0); + + clonerThread.join(); +} + +// We will retry our query without having yet obtained a resume token. +TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyBeforeFirstBatchRetrySuccess) { + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + // Preliminary setup for failpoints. + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage"); + auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + auto cloner = makeCollectionCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_OK(cloner->run()); + }); + + // Wait until we get to the query stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Bring the server down. We will fail right before our first batch. + _mockServer->shutdown(); + + // Let the cloner retry and wait until just before it. + beforeStageFailPoint->setMode(FailPoint::off, 0); + beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry); + + // Verify we haven't been able to receive anything yet. + auto stats = cloner->getStats(); + ASSERT_EQUALS(0, stats.receivedBatches); + + // Bring the server back up. + _mockServer->reboot(); + + // Let the retry commence. + beforeRetryFailPoint->setMode(FailPoint::off, 0); + + clonerThread.join(); + + // Check that we've received all the data. + ASSERT_EQUALS(3, _collectionStats->insertCount); + ASSERT_TRUE(_collectionStats->commitCalled); + stats = cloner->getStats(); + ASSERT_EQUALS(3u, stats.documentsCopied); +} + +// We will resume our query using the resume token we stored after receiving the first batch. +TEST_F(CollectionClonerTest, ResumableQueryFailTransientlyAfterFirstBatchRetrySuccess) { + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("killCursors", fromjson("{ok:1}")); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(5)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + _mockServer->insert(_nss.ns(), BSON("_id" << 4)); + _mockServer->insert(_nss.ns(), BSON("_id" << 5)); + + // Preliminary setup for hanging failpoint. + auto afterBatchFailpoint = + globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse"); + auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0); + + auto cloner = makeCollectionCloner(); + cloner->setBatchSize_forTest(2); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_OK(cloner->run()); + }); + + // Wait for us to process the first batch. + afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1); + + // Verify we've only managed to store one batch. + auto stats = cloner->getStats(); + ASSERT_EQUALS(1, stats.receivedBatches); + + // This will cause the next batch to fail once (transiently). + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}")); + + // Let the query stage finish. + afterBatchFailpoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // Since the CollectionMockStats class does not de-duplicate inserts, it is possible to insert + // the same document more than once, thereby also increasing the insertCount more than once. + // This means that here insertCount=5 is evidence that we correctly resumed our query where we + // left off (2 inserts in) instead of retrying the whole query (that leads to insertCount=7). + ASSERT_EQUALS(5, _collectionStats->insertCount); + ASSERT_TRUE(_collectionStats->commitCalled); + stats = cloner->getStats(); + ASSERT_EQUALS(5u, stats.documentsCopied); +} + +TEST_F(CollectionClonerTest, ResumableQueryNonRetriableError) { + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("killCursors", fromjson("{ok:1}")); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + auto status = cloner->run(); + ASSERT_EQUALS(ErrorCodes::UnknownError, status); + }); + + // Wait until we get to the query stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Verify we've made no progress yet. + auto stats = cloner->getStats(); + ASSERT_EQUALS(0, stats.receivedBatches); + + // This will cause the next batch to fail once, non-transiently. + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}")); + + // Let us begin with the query stage. + beforeStageFailPoint->setMode(FailPoint::off, 0); + + clonerThread.join(); +} + +TEST_F(CollectionClonerTest, ResumableQueryFailNonTransientlyAfterProgressMadeCannotRetry) { + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("killCursors", fromjson("{ok:1}")); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + cloner->setBatchSize_forTest(2); + auto afterBatchFailpoint = + globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse"); + auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + auto status = cloner->run(); + ASSERT_EQUALS(ErrorCodes::UnknownError, status); + }); + + afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1); + + // Verify we've only managed to store one batch. + auto stats = cloner->getStats(); + ASSERT_EQUALS(1, stats.receivedBatches); + + // This will cause the next batch to fail once, non-transiently. + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}")); + + // Allow the cloner to attempt (and fail) the next batch. + afterBatchFailpoint->setMode(FailPoint::off, 0); + + clonerThread.join(); +} + +TEST_F(CollectionClonerTest, ResumableQueryKillCursorsNetworkError) { + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + // Preliminary setup for hanging failpoint. + auto afterBatchFailpoint = + globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse"); + auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0); + + auto cloner = makeCollectionCloner(); + cloner->setBatchSize_forTest(2); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_OK(cloner->run()); + }); + + // Wait for us to process the first batch. + afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1); + + // Verify we've only managed to store one batch. + auto stats = cloner->getStats(); + ASSERT_EQUALS(1, stats.receivedBatches); + + // This will cause the next batch to fail once (transiently). + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}")); + + // Prepare the network error response to 'killCursors'. + // This will cause us to retry the query stage. + _mockServer->setCommandReply("killCursors", + Status{ErrorCodes::HostUnreachable, "HostUnreachable for test"}); + + // Let the query stage finish. + afterBatchFailpoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + ASSERT_EQUALS(3, _collectionStats->insertCount); + ASSERT_TRUE(_collectionStats->commitCalled); + stats = cloner->getStats(); + ASSERT_EQUALS(3u, stats.documentsCopied); +} + +TEST_F(CollectionClonerTest, ResumableQueryKillCursorsOtherError) { + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + // Preliminary setup for hanging failpoint. + auto afterBatchFailpoint = + globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse"); + auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0); + + auto cloner = makeCollectionCloner(); + cloner->setBatchSize_forTest(2); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_OK(cloner->run()); + }); + + // Wait for us to process the first batch. + afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1); + + // Verify we've only managed to store one batch. + auto stats = cloner->getStats(); + ASSERT_EQUALS(1, stats.receivedBatches); + + // This will cause the next batch to fail once (transiently). + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}")); + + // Prepare the network error response to 'killCursors'. + // This will cause us to retry the whole query. + _mockServer->setCommandReply("killCursors", + Status{ErrorCodes::UnknownError, "UnknownError for test"}); + + // Let the query stage finish. + afterBatchFailpoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // We ignored the 'killCursors' request and just resumed the query. We should have cloned every + // document exactly once. + ASSERT_EQUALS(3, _collectionStats->insertCount); + ASSERT_TRUE(_collectionStats->commitCalled); + stats = cloner->getStats(); + ASSERT_EQUALS(3u, stats.documentsCopied); +} + +// We retry the query after a transient error and we immediately encounter a non-retriable one. +TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAtRetry) { + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("killCursors", fromjson("{ok:1}")); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(5)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + // Preliminary setup for hanging failpoints. + auto afterBatchFailpoint = + globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse"); + auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0); + + auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage"); + auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + auto cloner = makeCollectionCloner(); + cloner->setBatchSize_forTest(2); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + auto status = cloner->run(); + ASSERT_EQUALS(ErrorCodes::UnknownError, status); + }); + + // Wait for us to process the first batch. + afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1); + + // Verify we've only managed to store one batch. + auto stats = cloner->getStats(); + ASSERT_EQUALS(1, stats.receivedBatches); + + // This will cause the next batch to fail once (transiently). + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}")); + + afterBatchFailpoint->setMode(FailPoint::off, 0); + beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1); + + // Follow-up the transient error with a non-retriable one. + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}")); + + beforeRetryFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // We only made it one batch in before failing. + stats = cloner->getStats(); + ASSERT_EQUALS(1u, stats.receivedBatches); +} + +// We retry the query after a transient error, we make a bit more progress and then we encounter +// a non-retriable one. +TEST_F(CollectionClonerTest, ResumableQueryNonTransientErrorAfterPastRetry) { + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("killCursors", fromjson("{ok:1}")); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(5)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + _mockServer->insert(_nss.ns(), BSON("_id" << 4)); + _mockServer->insert(_nss.ns(), BSON("_id" << 5)); + + // Preliminary setup for hanging failpoints. + auto afterBatchFailpoint = + globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse"); + auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0); + + auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage"); + auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + auto cloner = makeCollectionCloner(); + cloner->setBatchSize_forTest(2); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + auto status = cloner->run(); + ASSERT_EQUALS(ErrorCodes::UnknownError, status); + }); + + // Wait for us to process the first batch. + afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1); + + // Verify we've only managed to store one batch. + auto stats = cloner->getStats(); + ASSERT_EQUALS(1, stats.receivedBatches); + + // This will cause the next batch to fail once (transiently). + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}")); + + afterBatchFailpoint->setMode(FailPoint::off, 0); + beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1); + + // Do a quick failpoint dance so we clone one more batch before failing. + timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0); + beforeRetryFailPoint->setMode(FailPoint::off, 0); + afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1); + + // Follow-up the transient error with a non-retriable one. + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}")); + + afterBatchFailpoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + // We only made it one batch in before failing. + stats = cloner->getStats(); + ASSERT_EQUALS(2u, stats.receivedBatches); +} + +// We resume a query, receive some more data, then get a transient error again. The goal of this +// test is to make sure we don't forget to request the _next_ resume token when resuming a query. +TEST_F(CollectionClonerTest, ResumableQueryTwoResumes) { + + /** + * Test runs like so: + * + * |___batch___| . |___batch___| |___batch___| . |batch| + * | | + * resume 1 resume 2 + */ + + _mockServer->setCommandReply("replSetGetRBID", fromjson("{ok:1, rbid:1}")); + _mockServer->setCommandReply("killCursors", fromjson("{ok:1}")); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(5)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + _mockServer->insert(_nss.ns(), BSON("_id" << 4)); + _mockServer->insert(_nss.ns(), BSON("_id" << 5)); + _mockServer->insert(_nss.ns(), BSON("_id" << 6)); + _mockServer->insert(_nss.ns(), BSON("_id" << 7)); + + // Preliminary setup for hanging failpoints. + auto afterBatchFailpoint = + globalFailPointRegistry().find("initialSyncHangCollectionClonerAfterHandlingBatchResponse"); + auto timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0); + + auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage"); + auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + auto cloner = makeCollectionCloner(); + cloner->setBatchSize_forTest(2); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_OK(cloner->run()); + }); + + // Wait for us to process the first batch. + afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1); + + // Verify we've only managed to store one batch. + auto stats = cloner->getStats(); + ASSERT_EQUALS(1, stats.receivedBatches); + + // This will cause the next batch to fail once (transiently). + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'HostUnreachable'}")); + + afterBatchFailpoint->setMode(FailPoint::off, 0); + beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1); + + // Allow copying two more batches before the next error. + // It is important that the resumes come after differing amounts of progress, so that we can + // more easily distinguish error scenarios based on document count. (see end of test) + failNextBatch->setMode(FailPoint::skip, 2, fromjson("{errorType: 'HostUnreachable'}")); + + // Do a failpoint dance so we can get to the next retry. + timesEnteredAfterBatch = afterBatchFailpoint->setMode(FailPoint::alwaysOn, 0); + beforeRetryFailPoint->setMode(FailPoint::off, 0); + afterBatchFailpoint->waitForTimesEntered(timesEnteredAfterBatch + 1); + timesEnteredBeforeRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + afterBatchFailpoint->setMode(FailPoint::off, 0); + beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1); + + // Allow the clone to finish. + failNextBatch->setMode(FailPoint::off, 0); + beforeRetryFailPoint->setMode(FailPoint::off, 0); + + clonerThread.join(); + + /** + * Since the CollectionMockStats class does not de-duplicate inserts, it is possible to insert + * the same document more than once, thereby also increasing the insertCount more than once. + * We can therefore infer the resume history from the insertCount. In this test: + * - insertCount = 7: all the resumes were correct and we got every doc exactly once + * - this is the correct result + * - insertCount = 9: the first resume retried instead of resuming (second resume was correct) + * - insertCount = 11: the second resume used the first resume token instead of the second one + * - we test that we avoid this result + * - insertCount = 13: the second resume retried instead of resuming (first one was correct) + */ + + ASSERT_EQUALS(7, _collectionStats->insertCount); + ASSERT_TRUE(_collectionStats->commitCalled); + stats = cloner->getStats(); + ASSERT_EQUALS(7u, stats.documentsCopied); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index 0e440b47168..c867767b612 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -53,6 +53,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/client/clientdriver_network', + '$BUILD_DIR/mongo/client/dbclient_mockcursor', '$BUILD_DIR/mongo/db/repl/replica_set_messages' ], ) diff --git a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp index 23f5af622eb..79afb8861f8 100644 --- a/src/mongo/dbtests/mock/mock_dbclient_connection.cpp +++ b/src/mongo/dbtests/mock/mock_dbclient_connection.cpp @@ -112,8 +112,48 @@ std::unique_ptr<mongo::DBClientCursor> MockDBClientConnection::query( queryOptions, batchSize)); + BSONArray resultsInCursor; + + // A simple mock implementation of a resumable query, where we skip the first 'n' fields + // where 'n' is given by the mock resume token. + auto nToSkip = 0; + auto queryBson = fromjson(query.toString()); + if (queryBson.hasField("$_resumeAfter")) { + if (queryBson["$_resumeAfter"].Obj().hasField("n")) { + nToSkip = queryBson["$_resumeAfter"]["n"].numberInt(); + } + } + + bool provideResumeToken = false; + if (queryBson.hasField("$_requestResumeToken")) { + provideResumeToken = true; + } + + // Resume query. + if (nToSkip != 0) { + BSONObjIterator iter(result); + BSONArrayBuilder builder; + auto numExamined = 0; + + while (iter.more()) { + numExamined++; + + if (numExamined < nToSkip + 1) { + iter.next(); + continue; + } + + builder.append(iter.next().Obj()); + } + resultsInCursor = BSONArray(builder.obj()); + } else { + // Yield all results instead (default). + resultsInCursor = BSONArray(result.copy()); + } + std::unique_ptr<mongo::DBClientCursor> cursor; - cursor.reset(new DBClientMockCursor(this, BSONArray(result.copy()), batchSize)); + cursor.reset(new DBClientMockCursor( + this, BSONArray(resultsInCursor), provideResumeToken, batchSize)); return cursor; } catch (const mongo::DBException&) { _isFailed = true; |