diff options
author | Benety Goh <benety@mongodb.com> | 2016-10-05 20:14:59 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-10-07 16:09:59 -0400 |
commit | 44957419b2dd4e6b3f7be8b817a7fcd8c71d643d (patch) | |
tree | 636aecaf55d384684114cbba97c2b6725774a523 | |
parent | 8410a7cd7d145d5923f9b5ebc9672e612e6fcc1d (diff) | |
download | mongo-44957419b2dd4e6b3f7be8b817a7fcd8c71d643d.tar.gz |
SERVER-26520 CollectionCloner fetches document count for progress tracking from sync source before copying documents
-rw-r--r-- | jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 104 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 158 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 134 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner_test.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/databases_cloner_test.cpp | 6 |
10 files changed, 382 insertions, 77 deletions
diff --git a/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js b/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js index 94664964bde..64e33014b4c 100644 --- a/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js +++ b/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js @@ -86,7 +86,8 @@ assert.eq(res.initialSyncStatus.databases.databasesCloned, 2); assert.eq(res.initialSyncStatus.databases.test.collections, 1); assert.eq(res.initialSyncStatus.databases.test.clonedCollections, 1); - assert.eq(res.initialSyncStatus.databases.test["test.foo"].documents, 4); + assert.eq(res.initialSyncStatus.databases.test["test.foo"].documentsToCopy, 4); + assert.eq(res.initialSyncStatus.databases.test["test.foo"].documentsCopied, 4); assert.eq(res.initialSyncStatus.databases.test["test.foo"].indexes, 1); assert.eq(res.initialSyncStatus.databases.test["test.foo"].fetchedBatches, 1); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 8eec05c66a1..d92b0575b0a 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -760,10 +760,7 @@ env.Library( ], LIBDEPS=[ 'replmocks', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/commands_test_crutch', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', - '$BUILD_DIR/mongo/db/service_context_noop_init', ], ) @@ -776,9 +773,11 @@ env.Library( 'task_runner', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/client/fetcher', + '$BUILD_DIR/mongo/client/remote_command_retry_scheduler', '$BUILD_DIR/mongo/db/catalog/collection_options', '$BUILD_DIR/mongo/db/catalog/document_validation', '$BUILD_DIR/mongo/executor/task_executor_interface', + '$BUILD_DIR/mongo/rpc/command_status', '$BUILD_DIR/mongo/util/progress_meter', ], ) @@ -789,6 +788,10 @@ env.CppUnitTest( LIBDEPS=[ 'collection_cloner', 'base_cloner_test_fixture', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/commands_test_crutch', + '$BUILD_DIR/mongo/db/service_context_noop_init', + '$BUILD_DIR/mongo/unittest/task_executor_proxy', ], ) @@ -810,6 +813,9 @@ env.CppUnitTest( LIBDEPS=[ 'database_cloner', 'base_cloner_test_fixture', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/commands_test_crutch', + '$BUILD_DIR/mongo/db/service_context_noop_init', ], ) @@ -1004,6 +1010,7 @@ env.CppUnitTest( 'data_replicator_test.cpp', ], LIBDEPS=[ + 'base_cloner_test_fixture', 'data_replicator', 'data_replicator_external_state_mock', 'replication_executor_test_fixture', diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index 399be148782..59c8ff3c0f0 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -50,6 +50,11 @@ const BSONObj BaseClonerTest::idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << nss.ns()); // static +BSONObj BaseClonerTest::createCountResponse(int documentCount) { + return BSON("n" << documentCount << "ok" << 1); +} + +// static BSONObj BaseClonerTest::createCursorResponse(CursorId cursorId, const std::string& ns, const BSONArray& docs, diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h index 66943f15214..fb8ecf22861 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.h +++ b/src/mongo/db/repl/base_cloner_test_fixture.h @@ -58,6 +58,11 @@ public: typedef executor::NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator; /** + * Creates a count response with given document count. + */ + static BSONObj createCountResponse(int documentCount); + + /** * Creates a cursor response with given array of documents. */ static BSONObj createCursorResponse(CursorId cursorId, diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 1db815a789f..aeb2f61da63 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -32,12 +32,15 @@ #include "mongo/db/repl/collection_cloner.h" +#include "mongo/base/string_data.h" +#include "mongo/bson/util/bson_extract.h" #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/server_parameters.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/assert_util.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" @@ -50,12 +53,16 @@ namespace { using LockGuard = stdx::lock_guard<stdx::mutex>; using UniqueLock = stdx::unique_lock<stdx::mutex>; +constexpr auto kCountResponseDocumentCountFieldName = "n"_sd; + const int kProgressMeterSecondsBetween = 60; const int kProgressMeterCheckInterval = 128; // The batchSize to use for the query to get all documents from the collection. // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = batchSize to use. const auto batchSize = (16 * 1024 * 1024) / 12 * 10; +// The number of attempts for the count command, which gets the document count. +MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionCountAttempts, int, 3); // The number of attempts for the listIndexes commands. MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListIndexesAttempts, int, 3); // The number of attempts for the find command, which gets the data. @@ -78,6 +85,18 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, _onCompletion(onCompletion), _storageInterface(storageInterface), _active(false), + _countScheduler(_executor, + RemoteCommandRequest(_source, + _sourceNss.db().toString(), + BSON("count" << _sourceNss.coll()), + rpc::ServerSelectionMetadata(true, boost::none).toBSON(), + nullptr, + RemoteCommandRequest::kNoTimeout), + stdx::bind(&CollectionCloner::_countCallback, this, stdx::placeholders::_1), + RemoteCommandRetryScheduler::makeRetryPolicy( + numInitialSyncCollectionCountAttempts, + executor::RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::kAllRetriableErrors)), _listIndexesFetcher(_executor, _source, _sourceNss.db().toString(), @@ -124,7 +143,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, _dbWorkTaskRunner.schedule(task); return executor::TaskExecutor::CallbackHandle(); }), - _progressMeter(1U, + _progressMeter(1U, // total will be replaced with count command result. kProgressMeterSecondsBetween, kProgressMeterCheckInterval, "documents copied", @@ -138,10 +157,6 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor, uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); uassert(ErrorCodes::BadValue, "storage interface cannot be null", storageInterface); _stats.ns = _sourceNss.ns(); - // Hide collection size in progress output because this information is not available. - // Additionally, even if the collection size is known, it may change while we are copying the - // documents from the sync source. - _progressMeter.showTotal(false); } CollectionCloner::~CollectionCloner() { @@ -181,7 +196,7 @@ Status CollectionCloner::startup() { } _stats.start = _executor->now(); - Status scheduleResult = _listIndexesFetcher.schedule(); + Status scheduleResult = _countScheduler.startup(); if (!scheduleResult.isOK()) { return scheduleResult; } @@ -196,6 +211,7 @@ void CollectionCloner::shutdown() { return; } + _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); _findFetcher.shutdown(); _dbWorkTaskRunner.cancel(); @@ -223,6 +239,75 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched _scheduleDbWorkFn = scheduleDbWorkFn; } +void CollectionCloner::_countCallback( + const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + + // No need to reword status reason in the case of cancellation. + if (ErrorCodes::CallbackCanceled == args.response.status) { + _finishCallback(args.response.status); + return; + } + + if (!args.response.status.isOK()) { + _finishCallback({args.response.status.code(), + str::stream() << "During count call on collection '" << _sourceNss.ns() + << "' from " + << _source.toString() + << ", there was an error '" + << args.response.status.reason() + << "'"}); + return; + } + + Status commandStatus = getStatusFromCommandResult(args.response.data); + if (!commandStatus.isOK()) { + _finishCallback({commandStatus.code(), + str::stream() << "During count call on collection '" << _sourceNss.ns() + << "' from " + << _source.toString() + << ", there was a command error '" + << commandStatus.reason() + << "'"}); + return; + } + + long long count = 0; + auto countStatus = + bsonExtractIntegerField(args.response.data, kCountResponseDocumentCountFieldName, &count); + if (!countStatus.isOK()) { + _finishCallback({countStatus.code(), + str::stream() << "There was an error parsing document count from count " + "command result on collection " + << _sourceNss.ns() + << " from " + << _source.toString() + << ": " + << countStatus.reason()}); + return; + } + + if (count < 0) { + _finishCallback({ErrorCodes::BadValue, + str::stream() << "Count call on collection " << _sourceNss.ns() << " from " + << _source.toString() + << " returned negative document count: " + << count}); + return; + } + + { + LockGuard lk(_mutex); + _stats.documentToCopy = count; + _progressMeter.setTotalWhileRunning(static_cast<unsigned long long>(count)); + } + + auto scheduleStatus = _listIndexesFetcher.schedule(); + if (!scheduleStatus.isOK()) { + _finishCallback(scheduleStatus); + return; + } +} + void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { @@ -394,7 +479,7 @@ void CollectionCloner::_insertDocumentsCallback(const executor::TaskExecutor::Ca } _documents.swap(docs); - _stats.documents += docs.size(); + _stats.documentsCopied += docs.size(); ++_stats.fetchBatches; _progressMeter.hit(int(docs.size())); invariant(_collLoader); @@ -444,6 +529,8 @@ void CollectionCloner::_finishCallback(const Status& status) { LOG(1) << " collection: " << _destNss << ", stats: " << _stats.toString(); } +constexpr StringData CollectionCloner::Stats::kDocumentsToCopyFieldName; +constexpr StringData CollectionCloner::Stats::kDocumentsCopiedFieldName; std::string CollectionCloner::Stats::toString() const { return toBSON().toString(); @@ -457,7 +544,8 @@ BSONObj CollectionCloner::Stats::toBSON() const { } void CollectionCloner::Stats::append(BSONObjBuilder* builder) const { - builder->appendNumber("documents", documents); + builder->appendNumber(kDocumentsToCopyFieldName, documentToCopy); + builder->appendNumber(kDocumentsCopiedFieldName, documentsCopied); builder->appendNumber("indexes", indexes); builder->appendNumber("fetchedBatches", fetchBatches); if (start != Date_t()) { diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index fd6e4527b0f..005156a55a9 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -34,8 +34,10 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" +#include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" #include "mongo/client/fetcher.h" +#include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/base_cloner.h" @@ -62,10 +64,14 @@ class CollectionCloner : public BaseCloner { public: struct Stats { + static constexpr StringData kDocumentsToCopyFieldName = "documentsToCopy"_sd; + static constexpr StringData kDocumentsCopiedFieldName = "documentsCopied"_sd; + std::string ns; Date_t start; Date_t end; - size_t documents{0}; + size_t documentToCopy{0}; + size_t documentsCopied{0}; size_t indexes{0}; size_t fetchBatches{0}; @@ -135,6 +141,11 @@ public: private: /** + * Read number of documents in collection from count result. + */ + void _countCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& args); + + /** * Read index specs from listIndexes result. */ void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, @@ -197,12 +208,13 @@ private: CallbackFn _onCompletion; // (R) Invoked once when cloning completes or fails. StorageInterface* _storageInterface; // (R) Not owned by us. bool _active; // (M) true when Collection Cloner is started. - Fetcher _listIndexesFetcher; // (S) - Fetcher _findFetcher; // (S) - std::vector<BSONObj> _indexSpecs; // (M) - BSONObj _idIndexSpec; // (M) - std::vector<BSONObj> _documents; // (M) Documents read from fetcher to insert. - TaskRunner _dbWorkTaskRunner; // (R) + RemoteCommandRetryScheduler _countScheduler; // (S) + Fetcher _listIndexesFetcher; // (S) + Fetcher _findFetcher; // (S) + std::vector<BSONObj> _indexSpecs; // (M) + BSONObj _idIndexSpec; // (M) + std::vector<BSONObj> _documents; // (M) Documents read from fetcher to insert. + TaskRunner _dbWorkTaskRunner; // (R) ScheduleDbWorkFn _scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor. Stats _stats; // (M) stats for this instance. diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 6c31428d2bd..3e87c8be2b2 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/stdx/memory.h" +#include "mongo/unittest/task_executor_proxy.h" #include "mongo/unittest/unittest.h" #include "mongo/util/mongoutils/str.h" @@ -178,12 +179,146 @@ TEST_F(CollectionClonerTest, FirstRemoteCommand) { NetworkOperationIterator noi = net->getNextReadyRequest(); auto&& noiRequest = noi->getRequest(); ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); - ASSERT_EQUALS("listIndexes", std::string(noiRequest.cmdObj.firstElementFieldName())); + ASSERT_EQUALS("count", std::string(noiRequest.cmdObj.firstElementFieldName())); ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe()); ASSERT_FALSE(net->hasReadyRequests()); ASSERT_TRUE(collectionCloner->isActive()); } +TEST_F(CollectionClonerTest, CollectionClonerSetsDocumentCountInStatsFromCountCommandResult) { + ASSERT_OK(collectionCloner->startup()); + + ASSERT_EQUALS(0U, collectionCloner->getStats().documentToCopy); + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(100)); + } + getExecutor().shutdown(); + collectionCloner->join(); + ASSERT_EQUALS(100U, collectionCloner->getStats().documentToCopy); +} + +TEST_F(CollectionClonerTest, CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) { + ASSERT_OK(collectionCloner->startup()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(ErrorCodes::OperationFailed, ""); + } + collectionCloner->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); +} + +TEST_F(CollectionClonerTest, CollectionClonerPassesThroughCommandStatusErrorFromCountCommand) { + ASSERT_OK(collectionCloner->startup()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(BSON("ok" << 0 << "errmsg" + << "count error" + << "code" + << int(ErrorCodes::OperationFailed))); + } + collectionCloner->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); + ASSERT_STRING_CONTAINS(getStatus().reason(), "count error"); +} + +TEST_F(CollectionClonerTest, CollectionClonerResendsCountCommandOnRetriableError) { + ASSERT_OK(collectionCloner->startup()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(ErrorCodes::HostNotFound, ""); + processNetworkResponse(ErrorCodes::NetworkTimeout, ""); + processNetworkResponse(createCountResponse(100)); + } + getExecutor().shutdown(); + collectionCloner->join(); + ASSERT_EQUALS(100U, collectionCloner->getStats().documentToCopy); +} + +TEST_F(CollectionClonerTest, CollectionClonerReturnsLastRetriableErrorOnExceedingCountAttempts) { + ASSERT_OK(collectionCloner->startup()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(ErrorCodes::HostNotFound, ""); + processNetworkResponse(ErrorCodes::NetworkTimeout, ""); + processNetworkResponse(ErrorCodes::NotMaster, ""); + } + collectionCloner->join(); + ASSERT_EQUALS(ErrorCodes::NotMaster, getStatus()); +} + +TEST_F(CollectionClonerTest, CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) { + ASSERT_OK(collectionCloner->startup()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(BSON("ok" << 1)); + } + collectionCloner->join(); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, getStatus()); +} + +TEST_F(CollectionClonerTest, CollectionClonerReturnsBadValueOnNegativeDocumentCount) { + ASSERT_OK(collectionCloner->startup()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(-1)); + } + collectionCloner->join(); + ASSERT_EQUALS(ErrorCodes::BadValue, getStatus()); +} + +class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy { +public: + using ShouldFailRequestFn = stdx::function<bool(const executor::RemoteCommandRequest&)>; + + TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor, + ShouldFailRequestFn shouldFailRequest) + : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} + + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) override { + if (_shouldFailRequest(request)) { + return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); + } + return getExecutor()->scheduleRemoteCommand(request, cb); + } + +private: + ShouldFailRequestFn _shouldFailRequest; +}; + +TEST_F(CollectionClonerTest, + CollectionClonerReturnsScheduleErrorOnFailingToScheduleListIndexesCommand) { + TaskExecutorWithFailureInScheduleRemoteCommand _executorProxy( + &getExecutor(), [](const executor::RemoteCommandRequest& request) { + return str::equals("listIndexes", request.cmdObj.firstElementFieldName()); + }); + + collectionCloner = stdx::make_unique<CollectionCloner>( + &_executorProxy, + dbWorkThreadPool.get(), + target, + nss, + options, + stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), + storageInterface.get()); + + ASSERT_OK(collectionCloner->startup()); + + { + executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(100)); + } + collectionCloner->join(); + ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); +} + TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { options.reset(); options.autoIndexId = CollectionOptions::NO; @@ -216,6 +351,7 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { ASSERT_OK(collectionCloner->startup()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSONArray())); } ASSERT_TRUE(collectionCloner->isActive()); @@ -249,6 +385,7 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) { // the cloner stops the fetcher from retrieving more results. { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(1, BSONArray())); } @@ -278,6 +415,7 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNamespaceNotFound) { // the cloner stops the fetcher from retrieving more results. { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(ErrorCodes::NamespaceNotFound, "The collection doesn't exist."); } @@ -316,6 +454,7 @@ TEST_F(CollectionClonerTest, // the cloner stops the fetcher from retrieving more results. { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(ErrorCodes::NamespaceNotFound, "The collection doesn't exist."); } @@ -337,6 +476,7 @@ TEST_F(CollectionClonerTest, BeginCollectionScheduleDbWorkFailed) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -362,6 +502,7 @@ TEST_F(CollectionClonerTest, BeginCollectionCallbackCanceled) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -382,6 +523,7 @@ TEST_F(CollectionClonerTest, BeginCollectionFailed) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -424,6 +566,7 @@ TEST_F(CollectionClonerTest, BeginCollection) { // First batch contains the _id_ index spec. { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(1, BSON_ARRAY(idIndexSpec))); } @@ -474,6 +617,7 @@ TEST_F(CollectionClonerTest, FindFetcherScheduleFailed) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -501,6 +645,7 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -525,6 +670,7 @@ TEST_F(CollectionClonerTest, FindCommandFailed) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); @@ -549,6 +695,7 @@ TEST_F(CollectionClonerTest, FindCommandCanceled) { ASSERT_TRUE(collectionCloner->isActive()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); scheduleNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); @@ -586,6 +733,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -613,6 +761,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -646,6 +795,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); @@ -679,6 +829,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); @@ -709,6 +860,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); @@ -753,6 +905,7 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); @@ -805,6 +958,7 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); @@ -866,6 +1020,7 @@ TEST_F(CollectionClonerTest, CollectionClonerCanBeRestartedAfterPreviousFailure) { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); @@ -908,6 +1063,7 @@ TEST_F(CollectionClonerTest, CollectionClonerCanBeRestartedAfterPreviousFailure) { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 69994599db2..4fe4426b60a 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -689,54 +689,59 @@ TEST_F(InitialSyncTest, Complete) { * */ - const Responses responses = + auto lastOpAfterClone = BSON( + "ts" << Timestamp(Seconds(8), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion << "ns" + << "" + << "op" + << "i" + << "o" + << BSON("_id" << 5 << "a" << 2)); + + const Responses responses = { + {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, + // get latest oplog ts + {"find", + fromjson( + str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" + << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}")}, + // oplog fetcher find + {"find", + fromjson( + str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" + "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" + << OplogEntry::kOplogVersion + << ", op:'i', o:{_id:1, a:1}}]}}")}, + // Clone Start + // listDatabases + {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")}, + // listCollections for "a" + {"listCollections", + fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" + "{name:'a', options:{}} " + "]}}")}, + // count:a + {"count", BSON("n" << 1 << "ok" << 1)}, + // listIndexes:a { - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // oplog fetcher find - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:1, a:1}}]}}")}, - // Clone Start - // listDatabases - {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")}, - // listCollections for "a" - {"listCollections", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" - "{name:'a', options:{}} " - "]}}")}, - // listIndexes:a - {"listIndexes", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" - "{v:" - << OplogEntry::kOplogVersion - << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, - // find:a - {"find", - fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" - "{_id:1, a:1} " - "]}}")}, - // Clone Done - // get latest oplog ts - {"find", - fromjson(str::stream() - << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:[" - "{ts:Timestamp(7,1), h:NumberLong(1), ns:'a.a', v:" - << OplogEntry::kOplogVersion - << ", op:'i', o:{_id:5, a:2}}]}}")}, - {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, - // Applier starts ... - }; + "listIndexes", + fromjson(str::stream() + << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:[" + "{v:" + << OplogEntry::kOplogVersion + << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")}, + // find:a + {"find", + fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:[" + "{_id:1, a:1} " + "]}}")}, + // Clone Done + // get latest oplog ts + {"find", BaseClonerTest::createCursorResponse(0, BSON_ARRAY(lastOpAfterClone))}, + {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")}, + // Applier starts ... + }; // Initial sync flag should not be set before starting. auto txn = makeOpCtx(); @@ -746,7 +751,7 @@ TEST_F(InitialSyncTest, Complete) { // Play first response to ensure data replicator has entered initial sync state. setResponses({responses.begin(), responses.begin() + 1}); - numGetMoreOplogEntriesMax = 6; + numGetMoreOplogEntriesMax = responses.size(); playResponses(); // Initial sync flag should be set. @@ -773,9 +778,7 @@ TEST_F(InitialSyncTest, Complete) { ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get())); // getMore responses are generated by playResponses(). - ASSERT_EQUALS(OpTime(Timestamp(7, 1), OpTime::kUninitializedTerm), - OplogEntry(lastGetMoreOplogEntry).getOpTime()); - ASSERT_EQUALS(OplogEntry(lastGetMoreOplogEntry).getOpTime(), _myLastOpTime); + ASSERT_EQUALS(OplogEntry(lastOpAfterClone).getOpTime(), _myLastOpTime); } TEST_F(InitialSyncTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) { @@ -804,6 +807,8 @@ TEST_F(InitialSyncTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCl fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" "{name:'a', options:{}} " "]}}")}, + // count:a + {"count", BSON("n" << 1 << "ok" << 1)}, // listIndexes:a {"listIndexes", fromjson(str::stream() @@ -939,6 +944,8 @@ TEST_F(InitialSyncTest, FailOnRollback) { fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" "{name:'a', options:{}} " "]}}")}, + // count:a + {"count", BSON("n" << 1 << "ok" << 1)}, // listIndexes:a {"listIndexes", fromjson(str::stream() @@ -965,7 +972,7 @@ TEST_F(InitialSyncTest, FailOnRollback) { }; startSync(1); - numGetMoreOplogEntriesMax = 5; + numGetMoreOplogEntriesMax = responses.size(); setResponses(responses); playResponses(); verifySync(getNet(), ErrorCodes::UnrecoverableRollbackError); @@ -998,6 +1005,8 @@ TEST_F(InitialSyncTest, DataReplicatorPassesThroughRollbackCheckerScheduleError) fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" "{name:'a', options:{}} " "]}}")}, + // count:a + {"count", BSON("n" << 1 << "ok" << 1)}, // listIndexes:a {"listIndexes", fromjson(str::stream() @@ -1023,7 +1032,7 @@ TEST_F(InitialSyncTest, DataReplicatorPassesThroughRollbackCheckerScheduleError) }; startSync(1); - numGetMoreOplogEntriesMax = 5; + numGetMoreOplogEntriesMax = responses.size(); setResponses(responses); playResponses(); getExecutor().shutdown(); @@ -1101,6 +1110,8 @@ TEST_F(InitialSyncTest, OplogOutOfOrderOnOplogFetchFinish) { fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" "{name:'a', options:{}} " "]}}")}, + // count:a + {"count", BSON("n" << 1 << "ok" << 1)}, // listIndexes:a {"listIndexes", fromjson(str::stream() @@ -1142,7 +1153,7 @@ TEST_F(InitialSyncTest, OplogOutOfOrderOnOplogFetchFinish) { startSync(1); - numGetMoreOplogEntriesMax = 10; + numGetMoreOplogEntriesMax = responses.size(); setResponses({responses.begin(), responses.end() - 4}); playResponses(); log() << "done playing first responses"; @@ -1182,6 +1193,8 @@ TEST_F(InitialSyncTest, InitialSyncStateIsResetAfterFailure) { fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" "{name:'a', options:{}} " "]}}")}, + // count:a + {"count", BSON("n" << 1 << "ok" << 1)}, // listIndexes:a {"listIndexes", fromjson(str::stream() @@ -1208,7 +1221,7 @@ TEST_F(InitialSyncTest, InitialSyncStateIsResetAfterFailure) { startSync(2); - numGetMoreOplogEntriesMax = 6; + numGetMoreOplogEntriesMax = responses.size(); setResponses(responses); playResponses(); log() << "done playing first responses"; @@ -1277,6 +1290,8 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) { fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" "{name:'a', options:{}} " "]}}")}, + // count:a + {"count", BSON("n" << 5 << "ok" << 1)}, // listIndexes:a {"listIndexes", fromjson(str::stream() @@ -1327,7 +1342,7 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) { // Play first 2 responses to ensure data replicator has started the oplog fetcher. setResponses({failedResponses.begin(), failedResponses.begin() + 2}); - numGetMoreOplogEntriesMax = 10; + numGetMoreOplogEntriesMax = failedResponses.size() + successfulResponses.size(); playResponses(); log() << "Done playing first failed response"; @@ -1397,7 +1412,12 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) { ASSERT_EQUALS(1, dbProgress.getIntField("collections")) << dbProgress; ASSERT_EQUALS(1, dbProgress.getIntField("clonedCollections")) << dbProgress; auto collectionProgress = dbProgress.getObjectField("a.a"); - ASSERT_EQUALS(5, collectionProgress.getIntField("documents")) << collectionProgress; + ASSERT_EQUALS( + 5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsToCopyFieldName)) + << collectionProgress; + ASSERT_EQUALS( + 5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsCopiedFieldName)) + << collectionProgress; ASSERT_EQUALS(1, collectionProgress.getIntField("indexes")) << collectionProgress; ASSERT_EQUALS(5, collectionProgress.getIntField("fetchedBatches")) << collectionProgress; diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index 94e93be5be4..e5cb2b464e2 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -540,6 +540,7 @@ TEST_F(DatabaseClonerTest, StartSecondCollectionClonerFailed) { << "options" << BSONObj())))); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); processNetworkResponse(createCursorResponse(0, BSONArray())); } @@ -571,11 +572,13 @@ TEST_F(DatabaseClonerTest, FirstCollectionListIndexesFailed) { // This affects the order of the network responses. { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(BSON("ok" << 0 << "errmsg" << "fake message" << "code" << ErrorCodes::CursorNotFound)); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); processNetworkResponse(createCursorResponse(0, BSONArray())); } @@ -621,6 +624,7 @@ TEST_F(DatabaseClonerTest, CreateCollections) { // This affects the order of the network responses. { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(_databaseCloner->isActive()); @@ -632,6 +636,7 @@ TEST_F(DatabaseClonerTest, CreateCollections) { { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); + processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(_databaseCloner->isActive()); diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index 110db06f9d8..740a194e490 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -628,6 +628,8 @@ TEST_F(DBsClonerTest, SingleDatabaseCopiesCompletely) { fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" "{name:'a', options:{}} " "]}}")}, + // count:a + {"count", BSON("n" << 1 << "ok" << 1)}, // listIndexes:a { "listIndexes", @@ -657,6 +659,8 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) { fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:[" "{name:'a', options:{}} " "]}}")}, + // count:a + {"count", BSON("n" << 1 << "ok" << 1)}, // listIndexes:a {"listIndexes", fromjson(str::stream() @@ -674,6 +678,8 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) { fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'b.$cmd.listCollections', firstBatch:[" "{name:'b', options:{}} " "]}}")}, + // count:b + {"count", BSON("n" << 2 << "ok" << 1)}, // listIndexes:b {"listIndexes", fromjson(str::stream() |