summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js3
-rw-r--r--src/mongo/db/repl/SConscript13
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp9
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.h5
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp104
-rw-r--r--src/mongo/db/repl/collection_cloner.h26
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp158
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp134
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp5
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp6
10 files changed, 382 insertions, 81 deletions
diff --git a/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js b/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js
index 94664964bde..64e33014b4c 100644
--- a/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js
+++ b/jstests/noPassthroughWithMongod/initial_sync_replSetGetStatus.js
@@ -86,7 +86,8 @@
assert.eq(res.initialSyncStatus.databases.databasesCloned, 2);
assert.eq(res.initialSyncStatus.databases.test.collections, 1);
assert.eq(res.initialSyncStatus.databases.test.clonedCollections, 1);
- assert.eq(res.initialSyncStatus.databases.test["test.foo"].documents, 4);
+ assert.eq(res.initialSyncStatus.databases.test["test.foo"].documentsToCopy, 4);
+ assert.eq(res.initialSyncStatus.databases.test["test.foo"].documentsCopied, 4);
assert.eq(res.initialSyncStatus.databases.test["test.foo"].indexes, 1);
assert.eq(res.initialSyncStatus.databases.test["test.foo"].fetchedBatches, 1);
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 8eec05c66a1..d92b0575b0a 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -760,10 +760,7 @@ env.Library(
],
LIBDEPS=[
'replmocks',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/commands_test_crutch',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
],
)
@@ -776,9 +773,11 @@ env.Library(
'task_runner',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/client/fetcher',
+ '$BUILD_DIR/mongo/client/remote_command_retry_scheduler',
'$BUILD_DIR/mongo/db/catalog/collection_options',
'$BUILD_DIR/mongo/db/catalog/document_validation',
'$BUILD_DIR/mongo/executor/task_executor_interface',
+ '$BUILD_DIR/mongo/rpc/command_status',
'$BUILD_DIR/mongo/util/progress_meter',
],
)
@@ -789,6 +788,10 @@ env.CppUnitTest(
LIBDEPS=[
'collection_cloner',
'base_cloner_test_fixture',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/commands_test_crutch',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ '$BUILD_DIR/mongo/unittest/task_executor_proxy',
],
)
@@ -810,6 +813,9 @@ env.CppUnitTest(
LIBDEPS=[
'database_cloner',
'base_cloner_test_fixture',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/commands_test_crutch',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
],
)
@@ -1004,6 +1010,7 @@ env.CppUnitTest(
'data_replicator_test.cpp',
],
LIBDEPS=[
+ 'base_cloner_test_fixture',
'data_replicator',
'data_replicator_external_state_mock',
'replication_executor_test_fixture',
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index 399be148782..58eca351ca7 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -50,6 +50,11 @@ const BSONObj BaseClonerTest::idIndexSpec = BSON("v" << 1 << "key" << BSON("_id"
<< nss.ns());
// static
+BSONObj BaseClonerTest::createCountResponse(int documentCount) {
+ return BSON("n" << documentCount << "ok" << 1);
+}
+
+// static
BSONObj BaseClonerTest::createCursorResponse(CursorId cursorId,
const std::string& ns,
const BSONArray& docs,
@@ -226,10 +231,6 @@ void BaseClonerTest::testLifeCycle() {
// StartAndCancel
setUp();
ASSERT_OK(getCloner()->startup());
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- scheduleNetworkResponse(BSON("ok" << 1));
- }
getCloner()->shutdown();
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h
index 66943f15214..fb8ecf22861 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.h
+++ b/src/mongo/db/repl/base_cloner_test_fixture.h
@@ -58,6 +58,11 @@ public:
typedef executor::NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator;
/**
+ * Creates a count response with given document count.
+ */
+ static BSONObj createCountResponse(int documentCount);
+
+ /**
* Creates a cursor response with given array of documents.
*/
static BSONObj createCursorResponse(CursorId cursorId,
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 1db815a789f..aeb2f61da63 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -32,12 +32,15 @@
#include "mongo/db/repl/collection_cloner.h"
+#include "mongo/base/string_data.h"
+#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/server_parameters.h"
+#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
@@ -50,12 +53,16 @@ namespace {
using LockGuard = stdx::lock_guard<stdx::mutex>;
using UniqueLock = stdx::unique_lock<stdx::mutex>;
+constexpr auto kCountResponseDocumentCountFieldName = "n"_sd;
+
const int kProgressMeterSecondsBetween = 60;
const int kProgressMeterCheckInterval = 128;
// The batchSize to use for the query to get all documents from the collection.
// 16MB max batch size / 12 byte min doc size * 10 (for good measure) = batchSize to use.
const auto batchSize = (16 * 1024 * 1024) / 12 * 10;
+// The number of attempts for the count command, which gets the document count.
+MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionCountAttempts, int, 3);
// The number of attempts for the listIndexes commands.
MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListIndexesAttempts, int, 3);
// The number of attempts for the find command, which gets the data.
@@ -78,6 +85,18 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
_onCompletion(onCompletion),
_storageInterface(storageInterface),
_active(false),
+ _countScheduler(_executor,
+ RemoteCommandRequest(_source,
+ _sourceNss.db().toString(),
+ BSON("count" << _sourceNss.coll()),
+ rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ nullptr,
+ RemoteCommandRequest::kNoTimeout),
+ stdx::bind(&CollectionCloner::_countCallback, this, stdx::placeholders::_1),
+ RemoteCommandRetryScheduler::makeRetryPolicy(
+ numInitialSyncCollectionCountAttempts,
+ executor::RemoteCommandRequest::kNoTimeout,
+ RemoteCommandRetryScheduler::kAllRetriableErrors)),
_listIndexesFetcher(_executor,
_source,
_sourceNss.db().toString(),
@@ -124,7 +143,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
_dbWorkTaskRunner.schedule(task);
return executor::TaskExecutor::CallbackHandle();
}),
- _progressMeter(1U,
+ _progressMeter(1U, // total will be replaced with count command result.
kProgressMeterSecondsBetween,
kProgressMeterCheckInterval,
"documents copied",
@@ -138,10 +157,6 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
uassert(ErrorCodes::BadValue, "storage interface cannot be null", storageInterface);
_stats.ns = _sourceNss.ns();
- // Hide collection size in progress output because this information is not available.
- // Additionally, even if the collection size is known, it may change while we are copying the
- // documents from the sync source.
- _progressMeter.showTotal(false);
}
CollectionCloner::~CollectionCloner() {
@@ -181,7 +196,7 @@ Status CollectionCloner::startup() {
}
_stats.start = _executor->now();
- Status scheduleResult = _listIndexesFetcher.schedule();
+ Status scheduleResult = _countScheduler.startup();
if (!scheduleResult.isOK()) {
return scheduleResult;
}
@@ -196,6 +211,7 @@ void CollectionCloner::shutdown() {
return;
}
+ _countScheduler.shutdown();
_listIndexesFetcher.shutdown();
_findFetcher.shutdown();
_dbWorkTaskRunner.cancel();
@@ -223,6 +239,75 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched
_scheduleDbWorkFn = scheduleDbWorkFn;
}
+void CollectionCloner::_countCallback(
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+
+ // No need to reword status reason in the case of cancellation.
+ if (ErrorCodes::CallbackCanceled == args.response.status) {
+ _finishCallback(args.response.status);
+ return;
+ }
+
+ if (!args.response.status.isOK()) {
+ _finishCallback({args.response.status.code(),
+ str::stream() << "During count call on collection '" << _sourceNss.ns()
+ << "' from "
+ << _source.toString()
+ << ", there was an error '"
+ << args.response.status.reason()
+ << "'"});
+ return;
+ }
+
+ Status commandStatus = getStatusFromCommandResult(args.response.data);
+ if (!commandStatus.isOK()) {
+ _finishCallback({commandStatus.code(),
+ str::stream() << "During count call on collection '" << _sourceNss.ns()
+ << "' from "
+ << _source.toString()
+ << ", there was a command error '"
+ << commandStatus.reason()
+ << "'"});
+ return;
+ }
+
+ long long count = 0;
+ auto countStatus =
+ bsonExtractIntegerField(args.response.data, kCountResponseDocumentCountFieldName, &count);
+ if (!countStatus.isOK()) {
+ _finishCallback({countStatus.code(),
+ str::stream() << "There was an error parsing document count from count "
+ "command result on collection "
+ << _sourceNss.ns()
+ << " from "
+ << _source.toString()
+ << ": "
+ << countStatus.reason()});
+ return;
+ }
+
+ if (count < 0) {
+ _finishCallback({ErrorCodes::BadValue,
+ str::stream() << "Count call on collection " << _sourceNss.ns() << " from "
+ << _source.toString()
+ << " returned negative document count: "
+ << count});
+ return;
+ }
+
+ {
+ LockGuard lk(_mutex);
+ _stats.documentToCopy = count;
+ _progressMeter.setTotalWhileRunning(static_cast<unsigned long long>(count));
+ }
+
+ auto scheduleStatus = _listIndexesFetcher.schedule();
+ if (!scheduleStatus.isOK()) {
+ _finishCallback(scheduleStatus);
+ return;
+ }
+}
+
void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
@@ -394,7 +479,7 @@ void CollectionCloner::_insertDocumentsCallback(const executor::TaskExecutor::Ca
}
_documents.swap(docs);
- _stats.documents += docs.size();
+ _stats.documentsCopied += docs.size();
++_stats.fetchBatches;
_progressMeter.hit(int(docs.size()));
invariant(_collLoader);
@@ -444,6 +529,8 @@ void CollectionCloner::_finishCallback(const Status& status) {
LOG(1) << " collection: " << _destNss << ", stats: " << _stats.toString();
}
+constexpr StringData CollectionCloner::Stats::kDocumentsToCopyFieldName;
+constexpr StringData CollectionCloner::Stats::kDocumentsCopiedFieldName;
std::string CollectionCloner::Stats::toString() const {
return toBSON().toString();
@@ -457,7 +544,8 @@ BSONObj CollectionCloner::Stats::toBSON() const {
}
void CollectionCloner::Stats::append(BSONObjBuilder* builder) const {
- builder->appendNumber("documents", documents);
+ builder->appendNumber(kDocumentsToCopyFieldName, documentToCopy);
+ builder->appendNumber(kDocumentsCopiedFieldName, documentsCopied);
builder->appendNumber("indexes", indexes);
builder->appendNumber("fetchedBatches", fetchBatches);
if (start != Date_t()) {
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index fd6e4527b0f..005156a55a9 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -34,8 +34,10 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
+#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/client/fetcher.h"
+#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/base_cloner.h"
@@ -62,10 +64,14 @@ class CollectionCloner : public BaseCloner {
public:
struct Stats {
+ static constexpr StringData kDocumentsToCopyFieldName = "documentsToCopy"_sd;
+ static constexpr StringData kDocumentsCopiedFieldName = "documentsCopied"_sd;
+
std::string ns;
Date_t start;
Date_t end;
- size_t documents{0};
+ size_t documentToCopy{0};
+ size_t documentsCopied{0};
size_t indexes{0};
size_t fetchBatches{0};
@@ -135,6 +141,11 @@ public:
private:
/**
+ * Read number of documents in collection from count result.
+ */
+ void _countCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& args);
+
+ /**
* Read index specs from listIndexes result.
*/
void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
@@ -197,12 +208,13 @@ private:
CallbackFn _onCompletion; // (R) Invoked once when cloning completes or fails.
StorageInterface* _storageInterface; // (R) Not owned by us.
bool _active; // (M) true when Collection Cloner is started.
- Fetcher _listIndexesFetcher; // (S)
- Fetcher _findFetcher; // (S)
- std::vector<BSONObj> _indexSpecs; // (M)
- BSONObj _idIndexSpec; // (M)
- std::vector<BSONObj> _documents; // (M) Documents read from fetcher to insert.
- TaskRunner _dbWorkTaskRunner; // (R)
+ RemoteCommandRetryScheduler _countScheduler; // (S)
+ Fetcher _listIndexesFetcher; // (S)
+ Fetcher _findFetcher; // (S)
+ std::vector<BSONObj> _indexSpecs; // (M)
+ BSONObj _idIndexSpec; // (M)
+ std::vector<BSONObj> _documents; // (M) Documents read from fetcher to insert.
+ TaskRunner _dbWorkTaskRunner; // (R)
ScheduleDbWorkFn
_scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor.
Stats _stats; // (M) stats for this instance.
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 6c31428d2bd..3e87c8be2b2 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/stdx/memory.h"
+#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/mongoutils/str.h"
@@ -178,12 +179,146 @@ TEST_F(CollectionClonerTest, FirstRemoteCommand) {
NetworkOperationIterator noi = net->getNextReadyRequest();
auto&& noiRequest = noi->getRequest();
ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
- ASSERT_EQUALS("listIndexes", std::string(noiRequest.cmdObj.firstElementFieldName()));
+ ASSERT_EQUALS("count", std::string(noiRequest.cmdObj.firstElementFieldName()));
ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe());
ASSERT_FALSE(net->hasReadyRequests());
ASSERT_TRUE(collectionCloner->isActive());
}
+TEST_F(CollectionClonerTest, CollectionClonerSetsDocumentCountInStatsFromCountCommandResult) {
+ ASSERT_OK(collectionCloner->startup());
+
+ ASSERT_EQUALS(0U, collectionCloner->getStats().documentToCopy);
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(100));
+ }
+ getExecutor().shutdown();
+ collectionCloner->join();
+ ASSERT_EQUALS(100U, collectionCloner->getStats().documentToCopy);
+}
+
+TEST_F(CollectionClonerTest, CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) {
+ ASSERT_OK(collectionCloner->startup());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(ErrorCodes::OperationFailed, "");
+ }
+ collectionCloner->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
+}
+
+TEST_F(CollectionClonerTest, CollectionClonerPassesThroughCommandStatusErrorFromCountCommand) {
+ ASSERT_OK(collectionCloner->startup());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(BSON("ok" << 0 << "errmsg"
+ << "count error"
+ << "code"
+ << int(ErrorCodes::OperationFailed)));
+ }
+ collectionCloner->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
+ ASSERT_STRING_CONTAINS(getStatus().reason(), "count error");
+}
+
+TEST_F(CollectionClonerTest, CollectionClonerResendsCountCommandOnRetriableError) {
+ ASSERT_OK(collectionCloner->startup());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(ErrorCodes::HostNotFound, "");
+ processNetworkResponse(ErrorCodes::NetworkTimeout, "");
+ processNetworkResponse(createCountResponse(100));
+ }
+ getExecutor().shutdown();
+ collectionCloner->join();
+ ASSERT_EQUALS(100U, collectionCloner->getStats().documentToCopy);
+}
+
+TEST_F(CollectionClonerTest, CollectionClonerReturnsLastRetriableErrorOnExceedingCountAttempts) {
+ ASSERT_OK(collectionCloner->startup());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(ErrorCodes::HostNotFound, "");
+ processNetworkResponse(ErrorCodes::NetworkTimeout, "");
+ processNetworkResponse(ErrorCodes::NotMaster, "");
+ }
+ collectionCloner->join();
+ ASSERT_EQUALS(ErrorCodes::NotMaster, getStatus());
+}
+
+TEST_F(CollectionClonerTest, CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) {
+ ASSERT_OK(collectionCloner->startup());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(BSON("ok" << 1));
+ }
+ collectionCloner->join();
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, getStatus());
+}
+
+TEST_F(CollectionClonerTest, CollectionClonerReturnsBadValueOnNegativeDocumentCount) {
+ ASSERT_OK(collectionCloner->startup());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(-1));
+ }
+ collectionCloner->join();
+ ASSERT_EQUALS(ErrorCodes::BadValue, getStatus());
+}
+
+class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy {
+public:
+ using ShouldFailRequestFn = stdx::function<bool(const executor::RemoteCommandRequest&)>;
+
+ TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor,
+ ShouldFailRequestFn shouldFailRequest)
+ : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
+
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb) override {
+ if (_shouldFailRequest(request)) {
+ return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
+ }
+ return getExecutor()->scheduleRemoteCommand(request, cb);
+ }
+
+private:
+ ShouldFailRequestFn _shouldFailRequest;
+};
+
+TEST_F(CollectionClonerTest,
+ CollectionClonerReturnsScheduleErrorOnFailingToScheduleListIndexesCommand) {
+ TaskExecutorWithFailureInScheduleRemoteCommand _executorProxy(
+ &getExecutor(), [](const executor::RemoteCommandRequest& request) {
+ return str::equals("listIndexes", request.cmdObj.firstElementFieldName());
+ });
+
+ collectionCloner = stdx::make_unique<CollectionCloner>(
+ &_executorProxy,
+ dbWorkThreadPool.get(),
+ target,
+ nss,
+ options,
+ stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
+ storageInterface.get());
+
+ ASSERT_OK(collectionCloner->startup());
+
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(100));
+ }
+ collectionCloner->join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
+}
+
TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
options.reset();
options.autoIndexId = CollectionOptions::NO;
@@ -216,6 +351,7 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
ASSERT_OK(collectionCloner->startup());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSONArray()));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -249,6 +385,7 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) {
// the cloner stops the fetcher from retrieving more results.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(1, BSONArray()));
}
@@ -278,6 +415,7 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNamespaceNotFound) {
// the cloner stops the fetcher from retrieving more results.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(ErrorCodes::NamespaceNotFound, "The collection doesn't exist.");
}
@@ -316,6 +454,7 @@ TEST_F(CollectionClonerTest,
// the cloner stops the fetcher from retrieving more results.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(ErrorCodes::NamespaceNotFound, "The collection doesn't exist.");
}
@@ -337,6 +476,7 @@ TEST_F(CollectionClonerTest, BeginCollectionScheduleDbWorkFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -362,6 +502,7 @@ TEST_F(CollectionClonerTest, BeginCollectionCallbackCanceled) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -382,6 +523,7 @@ TEST_F(CollectionClonerTest, BeginCollectionFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -424,6 +566,7 @@ TEST_F(CollectionClonerTest, BeginCollection) {
// First batch contains the _id_ index spec.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(1, BSON_ARRAY(idIndexSpec)));
}
@@ -474,6 +617,7 @@ TEST_F(CollectionClonerTest, FindFetcherScheduleFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -501,6 +645,7 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -525,6 +670,7 @@ TEST_F(CollectionClonerTest, FindCommandFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -549,6 +695,7 @@ TEST_F(CollectionClonerTest, FindCommandCanceled) {
ASSERT_TRUE(collectionCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
scheduleNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -586,6 +733,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -613,6 +761,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -646,6 +795,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -679,6 +829,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -709,6 +860,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -753,6 +905,7 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -805,6 +958,7 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -866,6 +1020,7 @@ TEST_F(CollectionClonerTest, CollectionClonerCanBeRestartedAfterPreviousFailure)
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -908,6 +1063,7 @@ TEST_F(CollectionClonerTest, CollectionClonerCanBeRestartedAfterPreviousFailure)
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 69994599db2..4fe4426b60a 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -689,54 +689,59 @@ TEST_F(InitialSyncTest, Complete) {
*
*/
- const Responses responses =
+ auto lastOpAfterClone = BSON(
+ "ts" << Timestamp(Seconds(8), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion << "ns"
+ << ""
+ << "op"
+ << "i"
+ << "o"
+ << BSON("_id" << 5 << "a" << 2));
+
+ const Responses responses = {
+ {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
+ // get latest oplog ts
+ {"find",
+ fromjson(
+ str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}")},
+ // oplog fetcher find
+ {"find",
+ fromjson(
+ str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}")},
+ // Clone Start
+ // listDatabases
+ {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
+ // listCollections for "a"
+ {"listCollections",
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
+ "{name:'a', options:{}} "
+ "]}}")},
+ // count:a
+ {"count", BSON("n" << 1 << "ok" << 1)},
+ // listIndexes:a
{
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
- // listCollections for "a"
- {"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
- // listIndexes:a
- {"listIndexes",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:"
- << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
- // Clone Done
- // get latest oplog ts
- {"find",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(7,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:5, a:2}}]}}")},
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // Applier starts ...
- };
+ "listIndexes",
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
+ "{v:"
+ << OplogEntry::kOplogVersion
+ << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
+ // find:a
+ {"find",
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
+ "{_id:1, a:1} "
+ "]}}")},
+ // Clone Done
+ // get latest oplog ts
+ {"find", BaseClonerTest::createCursorResponse(0, BSON_ARRAY(lastOpAfterClone))},
+ {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
+ // Applier starts ...
+ };
// Initial sync flag should not be set before starting.
auto txn = makeOpCtx();
@@ -746,7 +751,7 @@ TEST_F(InitialSyncTest, Complete) {
// Play first response to ensure data replicator has entered initial sync state.
setResponses({responses.begin(), responses.begin() + 1});
- numGetMoreOplogEntriesMax = 6;
+ numGetMoreOplogEntriesMax = responses.size();
playResponses();
// Initial sync flag should be set.
@@ -773,9 +778,7 @@ TEST_F(InitialSyncTest, Complete) {
ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get()));
// getMore responses are generated by playResponses().
- ASSERT_EQUALS(OpTime(Timestamp(7, 1), OpTime::kUninitializedTerm),
- OplogEntry(lastGetMoreOplogEntry).getOpTime());
- ASSERT_EQUALS(OplogEntry(lastGetMoreOplogEntry).getOpTime(), _myLastOpTime);
+ ASSERT_EQUALS(OplogEntry(lastOpAfterClone).getOpTime(), _myLastOpTime);
}
TEST_F(InitialSyncTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) {
@@ -804,6 +807,8 @@ TEST_F(InitialSyncTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCl
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
+ // count:a
+ {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -939,6 +944,8 @@ TEST_F(InitialSyncTest, FailOnRollback) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
+ // count:a
+ {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -965,7 +972,7 @@ TEST_F(InitialSyncTest, FailOnRollback) {
};
startSync(1);
- numGetMoreOplogEntriesMax = 5;
+ numGetMoreOplogEntriesMax = responses.size();
setResponses(responses);
playResponses();
verifySync(getNet(), ErrorCodes::UnrecoverableRollbackError);
@@ -998,6 +1005,8 @@ TEST_F(InitialSyncTest, DataReplicatorPassesThroughRollbackCheckerScheduleError)
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
+ // count:a
+ {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -1023,7 +1032,7 @@ TEST_F(InitialSyncTest, DataReplicatorPassesThroughRollbackCheckerScheduleError)
};
startSync(1);
- numGetMoreOplogEntriesMax = 5;
+ numGetMoreOplogEntriesMax = responses.size();
setResponses(responses);
playResponses();
getExecutor().shutdown();
@@ -1101,6 +1110,8 @@ TEST_F(InitialSyncTest, OplogOutOfOrderOnOplogFetchFinish) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
+ // count:a
+ {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -1142,7 +1153,7 @@ TEST_F(InitialSyncTest, OplogOutOfOrderOnOplogFetchFinish) {
startSync(1);
- numGetMoreOplogEntriesMax = 10;
+ numGetMoreOplogEntriesMax = responses.size();
setResponses({responses.begin(), responses.end() - 4});
playResponses();
log() << "done playing first responses";
@@ -1182,6 +1193,8 @@ TEST_F(InitialSyncTest, InitialSyncStateIsResetAfterFailure) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
+ // count:a
+ {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -1208,7 +1221,7 @@ TEST_F(InitialSyncTest, InitialSyncStateIsResetAfterFailure) {
startSync(2);
- numGetMoreOplogEntriesMax = 6;
+ numGetMoreOplogEntriesMax = responses.size();
setResponses(responses);
playResponses();
log() << "done playing first responses";
@@ -1277,6 +1290,8 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
+ // count:a
+ {"count", BSON("n" << 5 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -1327,7 +1342,7 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
// Play first 2 responses to ensure data replicator has started the oplog fetcher.
setResponses({failedResponses.begin(), failedResponses.begin() + 2});
- numGetMoreOplogEntriesMax = 10;
+ numGetMoreOplogEntriesMax = failedResponses.size() + successfulResponses.size();
playResponses();
log() << "Done playing first failed response";
@@ -1397,7 +1412,12 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
ASSERT_EQUALS(1, dbProgress.getIntField("collections")) << dbProgress;
ASSERT_EQUALS(1, dbProgress.getIntField("clonedCollections")) << dbProgress;
auto collectionProgress = dbProgress.getObjectField("a.a");
- ASSERT_EQUALS(5, collectionProgress.getIntField("documents")) << collectionProgress;
+ ASSERT_EQUALS(
+ 5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsToCopyFieldName))
+ << collectionProgress;
+ ASSERT_EQUALS(
+ 5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsCopiedFieldName))
+ << collectionProgress;
ASSERT_EQUALS(1, collectionProgress.getIntField("indexes")) << collectionProgress;
ASSERT_EQUALS(5, collectionProgress.getIntField("fetchedBatches")) << collectionProgress;
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index 94e93be5be4..e5cb2b464e2 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -540,6 +540,7 @@ TEST_F(DatabaseClonerTest, StartSecondCollectionClonerFailed) {
<< "options"
<< BSONObj()))));
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
processNetworkResponse(createCursorResponse(0, BSONArray()));
}
@@ -571,11 +572,13 @@ TEST_F(DatabaseClonerTest, FirstCollectionListIndexesFailed) {
// This affects the order of the network responses.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(BSON("ok" << 0 << "errmsg"
<< "fake message"
<< "code"
<< ErrorCodes::CursorNotFound));
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
processNetworkResponse(createCursorResponse(0, BSONArray()));
}
@@ -621,6 +624,7 @@ TEST_F(DatabaseClonerTest, CreateCollections) {
// This affects the order of the network responses.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(_databaseCloner->isActive());
@@ -632,6 +636,7 @@ TEST_F(DatabaseClonerTest, CreateCollections) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(_databaseCloner->isActive());
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 110db06f9d8..740a194e490 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -628,6 +628,8 @@ TEST_F(DBsClonerTest, SingleDatabaseCopiesCompletely) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
+ // count:a
+ {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{
"listIndexes",
@@ -657,6 +659,8 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
+ // count:a
+ {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -674,6 +678,8 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'b.$cmd.listCollections', firstBatch:["
"{name:'b', options:{}} "
"]}}")},
+ // count:b
+ {"count", BSON("n" << 2 << "ok" << 1)},
// listIndexes:b
{"listIndexes",
fromjson(str::stream()