summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSpencer Jackson <spencer.jackson@mongodb.com>2016-10-07 17:48:08 -0400
committerSpencer Jackson <spencer.jackson@mongodb.com>2016-10-07 17:48:08 -0400
commit7ef545bd48c4a7a4a91305628ecfa7084d2956a6 (patch)
tree0fd0f5c5eec4b53f0c2ecc72e955b12ba7700ec2 /src/mongo/db
parenta477504c2090657a54d8ef8cd08130ef29375a8b (diff)
downloadmongo-7ef545bd48c4a7a4a91305628ecfa7084d2956a6.tar.gz
Revert "SERVER-26520 CollectionCloner fetches document count for progress tracking from sync source before copying documents"
This reverts commit 44957419b2dd4e6b3f7be8b817a7fcd8c71d643d.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/SConscript13
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp5
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.h5
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp104
-rw-r--r--src/mongo/db/repl/collection_cloner.h26
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp158
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp134
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp5
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp6
9 files changed, 76 insertions, 380 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d92b0575b0a..8eec05c66a1 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -760,7 +760,10 @@ env.Library(
],
LIBDEPS=[
'replmocks',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/commands_test_crutch',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
],
)
@@ -773,11 +776,9 @@ env.Library(
'task_runner',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/client/fetcher',
- '$BUILD_DIR/mongo/client/remote_command_retry_scheduler',
'$BUILD_DIR/mongo/db/catalog/collection_options',
'$BUILD_DIR/mongo/db/catalog/document_validation',
'$BUILD_DIR/mongo/executor/task_executor_interface',
- '$BUILD_DIR/mongo/rpc/command_status',
'$BUILD_DIR/mongo/util/progress_meter',
],
)
@@ -788,10 +789,6 @@ env.CppUnitTest(
LIBDEPS=[
'collection_cloner',
'base_cloner_test_fixture',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/commands_test_crutch',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
- '$BUILD_DIR/mongo/unittest/task_executor_proxy',
],
)
@@ -813,9 +810,6 @@ env.CppUnitTest(
LIBDEPS=[
'database_cloner',
'base_cloner_test_fixture',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/commands_test_crutch',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
],
)
@@ -1010,7 +1004,6 @@ env.CppUnitTest(
'data_replicator_test.cpp',
],
LIBDEPS=[
- 'base_cloner_test_fixture',
'data_replicator',
'data_replicator_external_state_mock',
'replication_executor_test_fixture',
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index 59c8ff3c0f0..399be148782 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -50,11 +50,6 @@ const BSONObj BaseClonerTest::idIndexSpec = BSON("v" << 1 << "key" << BSON("_id"
<< nss.ns());
// static
-BSONObj BaseClonerTest::createCountResponse(int documentCount) {
- return BSON("n" << documentCount << "ok" << 1);
-}
-
-// static
BSONObj BaseClonerTest::createCursorResponse(CursorId cursorId,
const std::string& ns,
const BSONArray& docs,
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h
index fb8ecf22861..66943f15214 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.h
+++ b/src/mongo/db/repl/base_cloner_test_fixture.h
@@ -58,11 +58,6 @@ public:
typedef executor::NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator;
/**
- * Creates a count response with given document count.
- */
- static BSONObj createCountResponse(int documentCount);
-
- /**
* Creates a cursor response with given array of documents.
*/
static BSONObj createCursorResponse(CursorId cursorId,
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index aeb2f61da63..1db815a789f 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -32,15 +32,12 @@
#include "mongo/db/repl/collection_cloner.h"
-#include "mongo/base/string_data.h"
-#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/server_parameters.h"
-#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
@@ -53,16 +50,12 @@ namespace {
using LockGuard = stdx::lock_guard<stdx::mutex>;
using UniqueLock = stdx::unique_lock<stdx::mutex>;
-constexpr auto kCountResponseDocumentCountFieldName = "n"_sd;
-
const int kProgressMeterSecondsBetween = 60;
const int kProgressMeterCheckInterval = 128;
// The batchSize to use for the query to get all documents from the collection.
// 16MB max batch size / 12 byte min doc size * 10 (for good measure) = batchSize to use.
const auto batchSize = (16 * 1024 * 1024) / 12 * 10;
-// The number of attempts for the count command, which gets the document count.
-MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionCountAttempts, int, 3);
// The number of attempts for the listIndexes commands.
MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListIndexesAttempts, int, 3);
// The number of attempts for the find command, which gets the data.
@@ -85,18 +78,6 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
_onCompletion(onCompletion),
_storageInterface(storageInterface),
_active(false),
- _countScheduler(_executor,
- RemoteCommandRequest(_source,
- _sourceNss.db().toString(),
- BSON("count" << _sourceNss.coll()),
- rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
- nullptr,
- RemoteCommandRequest::kNoTimeout),
- stdx::bind(&CollectionCloner::_countCallback, this, stdx::placeholders::_1),
- RemoteCommandRetryScheduler::makeRetryPolicy(
- numInitialSyncCollectionCountAttempts,
- executor::RemoteCommandRequest::kNoTimeout,
- RemoteCommandRetryScheduler::kAllRetriableErrors)),
_listIndexesFetcher(_executor,
_source,
_sourceNss.db().toString(),
@@ -143,7 +124,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
_dbWorkTaskRunner.schedule(task);
return executor::TaskExecutor::CallbackHandle();
}),
- _progressMeter(1U, // total will be replaced with count command result.
+ _progressMeter(1U,
kProgressMeterSecondsBetween,
kProgressMeterCheckInterval,
"documents copied",
@@ -157,6 +138,10 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
uassert(ErrorCodes::BadValue, "storage interface cannot be null", storageInterface);
_stats.ns = _sourceNss.ns();
+ // Hide collection size in progress output because this information is not available.
+ // Additionally, even if the collection size is known, it may change while we are copying the
+ // documents from the sync source.
+ _progressMeter.showTotal(false);
}
CollectionCloner::~CollectionCloner() {
@@ -196,7 +181,7 @@ Status CollectionCloner::startup() {
}
_stats.start = _executor->now();
- Status scheduleResult = _countScheduler.startup();
+ Status scheduleResult = _listIndexesFetcher.schedule();
if (!scheduleResult.isOK()) {
return scheduleResult;
}
@@ -211,7 +196,6 @@ void CollectionCloner::shutdown() {
return;
}
- _countScheduler.shutdown();
_listIndexesFetcher.shutdown();
_findFetcher.shutdown();
_dbWorkTaskRunner.cancel();
@@ -239,75 +223,6 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched
_scheduleDbWorkFn = scheduleDbWorkFn;
}
-void CollectionCloner::_countCallback(
- const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
-
- // No need to reword status reason in the case of cancellation.
- if (ErrorCodes::CallbackCanceled == args.response.status) {
- _finishCallback(args.response.status);
- return;
- }
-
- if (!args.response.status.isOK()) {
- _finishCallback({args.response.status.code(),
- str::stream() << "During count call on collection '" << _sourceNss.ns()
- << "' from "
- << _source.toString()
- << ", there was an error '"
- << args.response.status.reason()
- << "'"});
- return;
- }
-
- Status commandStatus = getStatusFromCommandResult(args.response.data);
- if (!commandStatus.isOK()) {
- _finishCallback({commandStatus.code(),
- str::stream() << "During count call on collection '" << _sourceNss.ns()
- << "' from "
- << _source.toString()
- << ", there was a command error '"
- << commandStatus.reason()
- << "'"});
- return;
- }
-
- long long count = 0;
- auto countStatus =
- bsonExtractIntegerField(args.response.data, kCountResponseDocumentCountFieldName, &count);
- if (!countStatus.isOK()) {
- _finishCallback({countStatus.code(),
- str::stream() << "There was an error parsing document count from count "
- "command result on collection "
- << _sourceNss.ns()
- << " from "
- << _source.toString()
- << ": "
- << countStatus.reason()});
- return;
- }
-
- if (count < 0) {
- _finishCallback({ErrorCodes::BadValue,
- str::stream() << "Count call on collection " << _sourceNss.ns() << " from "
- << _source.toString()
- << " returned negative document count: "
- << count});
- return;
- }
-
- {
- LockGuard lk(_mutex);
- _stats.documentToCopy = count;
- _progressMeter.setTotalWhileRunning(static_cast<unsigned long long>(count));
- }
-
- auto scheduleStatus = _listIndexesFetcher.schedule();
- if (!scheduleStatus.isOK()) {
- _finishCallback(scheduleStatus);
- return;
- }
-}
-
void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
@@ -479,7 +394,7 @@ void CollectionCloner::_insertDocumentsCallback(const executor::TaskExecutor::Ca
}
_documents.swap(docs);
- _stats.documentsCopied += docs.size();
+ _stats.documents += docs.size();
++_stats.fetchBatches;
_progressMeter.hit(int(docs.size()));
invariant(_collLoader);
@@ -529,8 +444,6 @@ void CollectionCloner::_finishCallback(const Status& status) {
LOG(1) << " collection: " << _destNss << ", stats: " << _stats.toString();
}
-constexpr StringData CollectionCloner::Stats::kDocumentsToCopyFieldName;
-constexpr StringData CollectionCloner::Stats::kDocumentsCopiedFieldName;
std::string CollectionCloner::Stats::toString() const {
return toBSON().toString();
@@ -544,8 +457,7 @@ BSONObj CollectionCloner::Stats::toBSON() const {
}
void CollectionCloner::Stats::append(BSONObjBuilder* builder) const {
- builder->appendNumber(kDocumentsToCopyFieldName, documentToCopy);
- builder->appendNumber(kDocumentsCopiedFieldName, documentsCopied);
+ builder->appendNumber("documents", documents);
builder->appendNumber("indexes", indexes);
builder->appendNumber("fetchedBatches", fetchBatches);
if (start != Date_t()) {
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 005156a55a9..fd6e4527b0f 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -34,10 +34,8 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
-#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/client/fetcher.h"
-#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/base_cloner.h"
@@ -64,14 +62,10 @@ class CollectionCloner : public BaseCloner {
public:
struct Stats {
- static constexpr StringData kDocumentsToCopyFieldName = "documentsToCopy"_sd;
- static constexpr StringData kDocumentsCopiedFieldName = "documentsCopied"_sd;
-
std::string ns;
Date_t start;
Date_t end;
- size_t documentToCopy{0};
- size_t documentsCopied{0};
+ size_t documents{0};
size_t indexes{0};
size_t fetchBatches{0};
@@ -141,11 +135,6 @@ public:
private:
/**
- * Read number of documents in collection from count result.
- */
- void _countCallback(const executor::TaskExecutor::RemoteCommandCallbackArgs& args);
-
- /**
* Read index specs from listIndexes result.
*/
void _listIndexesCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
@@ -208,13 +197,12 @@ private:
CallbackFn _onCompletion; // (R) Invoked once when cloning completes or fails.
StorageInterface* _storageInterface; // (R) Not owned by us.
bool _active; // (M) true when Collection Cloner is started.
- RemoteCommandRetryScheduler _countScheduler; // (S)
- Fetcher _listIndexesFetcher; // (S)
- Fetcher _findFetcher; // (S)
- std::vector<BSONObj> _indexSpecs; // (M)
- BSONObj _idIndexSpec; // (M)
- std::vector<BSONObj> _documents; // (M) Documents read from fetcher to insert.
- TaskRunner _dbWorkTaskRunner; // (R)
+ Fetcher _listIndexesFetcher; // (S)
+ Fetcher _findFetcher; // (S)
+ std::vector<BSONObj> _indexSpecs; // (M)
+ BSONObj _idIndexSpec; // (M)
+ std::vector<BSONObj> _documents; // (M) Documents read from fetcher to insert.
+ TaskRunner _dbWorkTaskRunner; // (R)
ScheduleDbWorkFn
_scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor.
Stats _stats; // (M) stats for this instance.
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 3e87c8be2b2..6c31428d2bd 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -37,7 +37,6 @@
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/stdx/memory.h"
-#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/mongoutils/str.h"
@@ -179,146 +178,12 @@ TEST_F(CollectionClonerTest, FirstRemoteCommand) {
NetworkOperationIterator noi = net->getNextReadyRequest();
auto&& noiRequest = noi->getRequest();
ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
- ASSERT_EQUALS("count", std::string(noiRequest.cmdObj.firstElementFieldName()));
+ ASSERT_EQUALS("listIndexes", std::string(noiRequest.cmdObj.firstElementFieldName()));
ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe());
ASSERT_FALSE(net->hasReadyRequests());
ASSERT_TRUE(collectionCloner->isActive());
}
-TEST_F(CollectionClonerTest, CollectionClonerSetsDocumentCountInStatsFromCountCommandResult) {
- ASSERT_OK(collectionCloner->startup());
-
- ASSERT_EQUALS(0U, collectionCloner->getStats().documentToCopy);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(100));
- }
- getExecutor().shutdown();
- collectionCloner->join();
- ASSERT_EQUALS(100U, collectionCloner->getStats().documentToCopy);
-}
-
-TEST_F(CollectionClonerTest, CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) {
- ASSERT_OK(collectionCloner->startup());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(ErrorCodes::OperationFailed, "");
- }
- collectionCloner->join();
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
-}
-
-TEST_F(CollectionClonerTest, CollectionClonerPassesThroughCommandStatusErrorFromCountCommand) {
- ASSERT_OK(collectionCloner->startup());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("ok" << 0 << "errmsg"
- << "count error"
- << "code"
- << int(ErrorCodes::OperationFailed)));
- }
- collectionCloner->join();
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
- ASSERT_STRING_CONTAINS(getStatus().reason(), "count error");
-}
-
-TEST_F(CollectionClonerTest, CollectionClonerResendsCountCommandOnRetriableError) {
- ASSERT_OK(collectionCloner->startup());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(ErrorCodes::HostNotFound, "");
- processNetworkResponse(ErrorCodes::NetworkTimeout, "");
- processNetworkResponse(createCountResponse(100));
- }
- getExecutor().shutdown();
- collectionCloner->join();
- ASSERT_EQUALS(100U, collectionCloner->getStats().documentToCopy);
-}
-
-TEST_F(CollectionClonerTest, CollectionClonerReturnsLastRetriableErrorOnExceedingCountAttempts) {
- ASSERT_OK(collectionCloner->startup());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(ErrorCodes::HostNotFound, "");
- processNetworkResponse(ErrorCodes::NetworkTimeout, "");
- processNetworkResponse(ErrorCodes::NotMaster, "");
- }
- collectionCloner->join();
- ASSERT_EQUALS(ErrorCodes::NotMaster, getStatus());
-}
-
-TEST_F(CollectionClonerTest, CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) {
- ASSERT_OK(collectionCloner->startup());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("ok" << 1));
- }
- collectionCloner->join();
- ASSERT_EQUALS(ErrorCodes::NoSuchKey, getStatus());
-}
-
-TEST_F(CollectionClonerTest, CollectionClonerReturnsBadValueOnNegativeDocumentCount) {
- ASSERT_OK(collectionCloner->startup());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(-1));
- }
- collectionCloner->join();
- ASSERT_EQUALS(ErrorCodes::BadValue, getStatus());
-}
-
-class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy {
-public:
- using ShouldFailRequestFn = stdx::function<bool(const executor::RemoteCommandRequest&)>;
-
- TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor,
- ShouldFailRequestFn shouldFailRequest)
- : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
-
- StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
- const RemoteCommandCallbackFn& cb) override {
- if (_shouldFailRequest(request)) {
- return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
- }
- return getExecutor()->scheduleRemoteCommand(request, cb);
- }
-
-private:
- ShouldFailRequestFn _shouldFailRequest;
-};
-
-TEST_F(CollectionClonerTest,
- CollectionClonerReturnsScheduleErrorOnFailingToScheduleListIndexesCommand) {
- TaskExecutorWithFailureInScheduleRemoteCommand _executorProxy(
- &getExecutor(), [](const executor::RemoteCommandRequest& request) {
- return str::equals("listIndexes", request.cmdObj.firstElementFieldName());
- });
-
- collectionCloner = stdx::make_unique<CollectionCloner>(
- &_executorProxy,
- dbWorkThreadPool.get(),
- target,
- nss,
- options,
- stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
- storageInterface.get());
-
- ASSERT_OK(collectionCloner->startup());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(100));
- }
- collectionCloner->join();
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
-}
-
TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
options.reset();
options.autoIndexId = CollectionOptions::NO;
@@ -351,7 +216,6 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
ASSERT_OK(collectionCloner->startup());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSONArray()));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -385,7 +249,6 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) {
// the cloner stops the fetcher from retrieving more results.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(1, BSONArray()));
}
@@ -415,7 +278,6 @@ TEST_F(CollectionClonerTest, ListIndexesReturnedNamespaceNotFound) {
// the cloner stops the fetcher from retrieving more results.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(ErrorCodes::NamespaceNotFound, "The collection doesn't exist.");
}
@@ -454,7 +316,6 @@ TEST_F(CollectionClonerTest,
// the cloner stops the fetcher from retrieving more results.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(ErrorCodes::NamespaceNotFound, "The collection doesn't exist.");
}
@@ -476,7 +337,6 @@ TEST_F(CollectionClonerTest, BeginCollectionScheduleDbWorkFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -502,7 +362,6 @@ TEST_F(CollectionClonerTest, BeginCollectionCallbackCanceled) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -523,7 +382,6 @@ TEST_F(CollectionClonerTest, BeginCollectionFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -566,7 +424,6 @@ TEST_F(CollectionClonerTest, BeginCollection) {
// First batch contains the _id_ index spec.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(1, BSON_ARRAY(idIndexSpec)));
}
@@ -617,7 +474,6 @@ TEST_F(CollectionClonerTest, FindFetcherScheduleFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -645,7 +501,6 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -670,7 +525,6 @@ TEST_F(CollectionClonerTest, FindCommandFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -695,7 +549,6 @@ TEST_F(CollectionClonerTest, FindCommandCanceled) {
ASSERT_TRUE(collectionCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
scheduleNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -733,7 +586,6 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -761,7 +613,6 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -795,7 +646,6 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -829,7 +679,6 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -860,7 +709,6 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -905,7 +753,6 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -958,7 +805,6 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -1020,7 +866,6 @@ TEST_F(CollectionClonerTest, CollectionClonerCanBeRestartedAfterPreviousFailure)
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
@@ -1063,7 +908,6 @@ TEST_F(CollectionClonerTest, CollectionClonerCanBeRestartedAfterPreviousFailure)
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 4fe4426b60a..69994599db2 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -689,59 +689,54 @@ TEST_F(InitialSyncTest, Complete) {
*
*/
- auto lastOpAfterClone = BSON(
- "ts" << Timestamp(Seconds(8), 1U) << "h" << 1LL << "v" << OplogEntry::kOplogVersion << "ns"
- << ""
- << "op"
- << "i"
- << "o"
- << BSON("_id" << 5 << "a" << 2));
-
- const Responses responses = {
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // get latest oplog ts
- {"find",
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // oplog fetcher find
- {"find",
- fromjson(
- str::stream() << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
- "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
- << OplogEntry::kOplogVersion
- << ", op:'i', o:{_id:1, a:1}}]}}")},
- // Clone Start
- // listDatabases
- {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
- // listCollections for "a"
- {"listCollections",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
- "{name:'a', options:{}} "
- "]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
- // listIndexes:a
+ const Responses responses =
{
- "listIndexes",
- fromjson(str::stream()
- << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
- "{v:"
- << OplogEntry::kOplogVersion
- << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
- // find:a
- {"find",
- fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
- "{_id:1, a:1} "
- "]}}")},
- // Clone Done
- // get latest oplog ts
- {"find", BaseClonerTest::createCursorResponse(0, BSON_ARRAY(lastOpAfterClone))},
- {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
- // Applier starts ...
- };
+ {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
+ // get latest oplog ts
+ {"find",
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}")},
+ // oplog fetcher find
+ {"find",
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(1), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(1,1), h:NumberLong(1), ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:1, a:1}}]}}")},
+ // Clone Start
+ // listDatabases
+ {"listDatabases", fromjson("{ok:1, databases:[{name:'a'}]}")},
+ // listCollections for "a"
+ {"listCollections",
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
+ "{name:'a', options:{}} "
+ "]}}")},
+ // listIndexes:a
+ {"listIndexes",
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listIndexes.a', firstBatch:["
+ "{v:"
+ << OplogEntry::kOplogVersion
+ << ", key:{_id:1}, name:'_id_', ns:'a.a'}]}}")},
+ // find:a
+ {"find",
+ fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.a', firstBatch:["
+ "{_id:1, a:1} "
+ "]}}")},
+ // Clone Done
+ // get latest oplog ts
+ {"find",
+ fromjson(str::stream()
+ << "{ok:1, cursor:{id:NumberLong(0), ns:'local.oplog.rs', firstBatch:["
+ "{ts:Timestamp(7,1), h:NumberLong(1), ns:'a.a', v:"
+ << OplogEntry::kOplogVersion
+ << ", op:'i', o:{_id:5, a:2}}]}}")},
+ {"replSetGetRBID", fromjson(str::stream() << "{ok: 1, rbid:1}")},
+ // Applier starts ...
+ };
// Initial sync flag should not be set before starting.
auto txn = makeOpCtx();
@@ -751,7 +746,7 @@ TEST_F(InitialSyncTest, Complete) {
// Play first response to ensure data replicator has entered initial sync state.
setResponses({responses.begin(), responses.begin() + 1});
- numGetMoreOplogEntriesMax = responses.size();
+ numGetMoreOplogEntriesMax = 6;
playResponses();
// Initial sync flag should be set.
@@ -778,7 +773,9 @@ TEST_F(InitialSyncTest, Complete) {
ASSERT_FALSE(getStorage().getInitialSyncFlag(txn.get()));
// getMore responses are generated by playResponses().
- ASSERT_EQUALS(OplogEntry(lastOpAfterClone).getOpTime(), _myLastOpTime);
+ ASSERT_EQUALS(OpTime(Timestamp(7, 1), OpTime::kUninitializedTerm),
+ OplogEntry(lastGetMoreOplogEntry).getOpTime());
+ ASSERT_EQUALS(OplogEntry(lastGetMoreOplogEntry).getOpTime(), _myLastOpTime);
}
TEST_F(InitialSyncTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCloning) {
@@ -807,8 +804,6 @@ TEST_F(InitialSyncTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfterCl
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -944,8 +939,6 @@ TEST_F(InitialSyncTest, FailOnRollback) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -972,7 +965,7 @@ TEST_F(InitialSyncTest, FailOnRollback) {
};
startSync(1);
- numGetMoreOplogEntriesMax = responses.size();
+ numGetMoreOplogEntriesMax = 5;
setResponses(responses);
playResponses();
verifySync(getNet(), ErrorCodes::UnrecoverableRollbackError);
@@ -1005,8 +998,6 @@ TEST_F(InitialSyncTest, DataReplicatorPassesThroughRollbackCheckerScheduleError)
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -1032,7 +1023,7 @@ TEST_F(InitialSyncTest, DataReplicatorPassesThroughRollbackCheckerScheduleError)
};
startSync(1);
- numGetMoreOplogEntriesMax = responses.size();
+ numGetMoreOplogEntriesMax = 5;
setResponses(responses);
playResponses();
getExecutor().shutdown();
@@ -1110,8 +1101,6 @@ TEST_F(InitialSyncTest, OplogOutOfOrderOnOplogFetchFinish) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -1153,7 +1142,7 @@ TEST_F(InitialSyncTest, OplogOutOfOrderOnOplogFetchFinish) {
startSync(1);
- numGetMoreOplogEntriesMax = responses.size();
+ numGetMoreOplogEntriesMax = 10;
setResponses({responses.begin(), responses.end() - 4});
playResponses();
log() << "done playing first responses";
@@ -1193,8 +1182,6 @@ TEST_F(InitialSyncTest, InitialSyncStateIsResetAfterFailure) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -1221,7 +1208,7 @@ TEST_F(InitialSyncTest, InitialSyncStateIsResetAfterFailure) {
startSync(2);
- numGetMoreOplogEntriesMax = responses.size();
+ numGetMoreOplogEntriesMax = 6;
setResponses(responses);
playResponses();
log() << "done playing first responses";
@@ -1290,8 +1277,6 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
- // count:a
- {"count", BSON("n" << 5 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -1342,7 +1327,7 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
// Play first 2 responses to ensure data replicator has started the oplog fetcher.
setResponses({failedResponses.begin(), failedResponses.begin() + 2});
- numGetMoreOplogEntriesMax = failedResponses.size() + successfulResponses.size();
+ numGetMoreOplogEntriesMax = 10;
playResponses();
log() << "Done playing first failed response";
@@ -1412,12 +1397,7 @@ TEST_F(InitialSyncTest, GetInitialSyncProgressReturnsCorrectProgress) {
ASSERT_EQUALS(1, dbProgress.getIntField("collections")) << dbProgress;
ASSERT_EQUALS(1, dbProgress.getIntField("clonedCollections")) << dbProgress;
auto collectionProgress = dbProgress.getObjectField("a.a");
- ASSERT_EQUALS(
- 5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsToCopyFieldName))
- << collectionProgress;
- ASSERT_EQUALS(
- 5, collectionProgress.getIntField(CollectionCloner::Stats::kDocumentsCopiedFieldName))
- << collectionProgress;
+ ASSERT_EQUALS(5, collectionProgress.getIntField("documents")) << collectionProgress;
ASSERT_EQUALS(1, collectionProgress.getIntField("indexes")) << collectionProgress;
ASSERT_EQUALS(5, collectionProgress.getIntField("fetchedBatches")) << collectionProgress;
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index e5cb2b464e2..94e93be5be4 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -540,7 +540,6 @@ TEST_F(DatabaseClonerTest, StartSecondCollectionClonerFailed) {
<< "options"
<< BSONObj()))));
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
processNetworkResponse(createCursorResponse(0, BSONArray()));
}
@@ -572,13 +571,11 @@ TEST_F(DatabaseClonerTest, FirstCollectionListIndexesFailed) {
// This affects the order of the network responses.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(BSON("ok" << 0 << "errmsg"
<< "fake message"
<< "code"
<< ErrorCodes::CursorNotFound));
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
processNetworkResponse(createCursorResponse(0, BSONArray()));
}
@@ -624,7 +621,6 @@ TEST_F(DatabaseClonerTest, CreateCollections) {
// This affects the order of the network responses.
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(_databaseCloner->isActive());
@@ -636,7 +632,6 @@ TEST_F(DatabaseClonerTest, CreateCollections) {
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(_databaseCloner->isActive());
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 740a194e490..110db06f9d8 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -628,8 +628,6 @@ TEST_F(DBsClonerTest, SingleDatabaseCopiesCompletely) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{
"listIndexes",
@@ -659,8 +657,6 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'a.$cmd.listCollections', firstBatch:["
"{name:'a', options:{}} "
"]}}")},
- // count:a
- {"count", BSON("n" << 1 << "ok" << 1)},
// listIndexes:a
{"listIndexes",
fromjson(str::stream()
@@ -678,8 +674,6 @@ TEST_F(DBsClonerTest, TwoDatabasesCopiesCompletely) {
fromjson("{ok:1, cursor:{id:NumberLong(0), ns:'b.$cmd.listCollections', firstBatch:["
"{name:'b', options:{}} "
"]}}")},
- // count:b
- {"count", BSON("n" << 2 << "ok" << 1)},
// listIndexes:b
{"listIndexes",
fromjson(str::stream()