summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2017-08-11 17:09:56 -0400
committerWilliam Schultz <william.schultz@mongodb.com>2017-08-11 17:09:56 -0400
commit8ddfeab4644aa38082b3cd03cfeef1dd6c65d35a (patch)
tree0be30537fbd87401cf57ac1cd3d414884eaab2dd /src/mongo/db
parent0d3137df3879e86d92904309e968f25529904639 (diff)
downloadmongo-8ddfeab4644aa38082b3cd03cfeef1dd6c65d35a.tar.gz
Revert "SERVER-29617 replace fetcher with ARM and add numCursors server parameter"
This reverts commit 0d3137df3879e86d92904309e968f25529904639.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp5
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.h2
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp417
-rw-r--r--src/mongo/db/repl/collection_cloner.h92
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp894
-rw-r--r--src/mongo/db/repl/database_cloner.cpp6
-rw-r--r--src/mongo/db/s/sharding_task_executor.cpp10
7 files changed, 175 insertions, 1251 deletions
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index fa418d338fb..e1a8131fb10 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -76,11 +76,6 @@ BSONObj BaseClonerTest::createCursorResponse(CursorId cursorId, const BSONArray&
}
// static
-BSONObj BaseClonerTest::createFinalCursorResponse(const BSONArray& docs) {
- return createCursorResponse(0, docs, "nextBatch");
-}
-
-// static
BSONObj BaseClonerTest::createListCollectionsResponse(CursorId cursorId,
const BSONArray& colls,
const char* fieldName) {
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h
index e5615806fc6..fb8ecf22861 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.h
+++ b/src/mongo/db/repl/base_cloner_test_fixture.h
@@ -76,8 +76,6 @@ public:
static BSONObj createCursorResponse(CursorId cursorId, const BSONArray& docs);
- static BSONObj createFinalCursorResponse(const BSONArray& docs);
-
/**
* Creates a listCollections response with given array of index specs.
*/
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 98d658b49f9..13b27d0ed01 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -74,9 +74,9 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionFindAttempts, int, 3);
// collection 'namespace'.
MONGO_FP_DECLARE(initialSyncHangDuringCollectionClone);
-// Failpoint which causes initial sync to hang after handling the next batch of results from the
-// 'AsyncResultsMerger' for a specific collection.
-MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterHandlingBatchResponse);
+// Failpoint which causes initial sync to hang after the initial 'find' command of collection
+// cloning, for a specific collection.
+MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterInitialFind);
CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
OldThreadPool* dbWorkThreadPool,
@@ -85,8 +85,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
const CollectionOptions& options,
const CallbackFn& onCompletion,
StorageInterface* storageInterface,
- const int batchSize,
- const int maxNumClonerCursors)
+ const int batchSize)
: _executor(executor),
_dbWorkThreadPool(dbWorkThreadPool),
_source(source),
@@ -123,7 +122,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
executor::RemoteCommandRequest::kNoTimeout,
RemoteCommandRetryScheduler::kAllRetriableErrors)),
_indexSpecs(),
- _documentsToInsert(),
+ _documents(),
_dbWorkTaskRunner(_dbWorkThreadPool),
_scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) {
auto task = [work](OperationContext* opCtx,
@@ -139,8 +138,9 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
kProgressMeterCheckInterval,
"documents copied",
str::stream() << _sourceNss.toString() << " collection clone progress"),
- _collectionCloningBatchSize(batchSize),
- _maxNumClonerCursors(maxNumClonerCursors) {
+ _batchSize(batchSize),
+ _arm(executor,
+ stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()).get()) {
// Fetcher throws an exception on null executor.
invariant(executor);
uassert(ErrorCodes::BadValue,
@@ -215,19 +215,15 @@ void CollectionCloner::shutdown() {
// Nothing to do if we are already in ShuttingDown or Complete state.
return;
}
+
_cancelRemainingWork_inlock();
}
void CollectionCloner::_cancelRemainingWork_inlock() {
- if (_arm) {
- Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- _killArmHandle = _arm->kill(opCtx);
- }
_countScheduler.shutdown();
_listIndexesFetcher.shutdown();
- if (_establishCollectionCursorsScheduler) {
- _establishCollectionCursorsScheduler->shutdown();
+ if (_findFetcher) {
+ _findFetcher->shutdown();
}
_dbWorkTaskRunner.cancel();
}
@@ -239,9 +235,6 @@ CollectionCloner::Stats CollectionCloner::getStats() const {
void CollectionCloner::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_killArmHandle) {
- _executor->waitForEvent(_killArmHandle);
- }
_condition.wait(lk, [this]() { return !_isActive_inlock(); });
}
@@ -257,11 +250,6 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched
_scheduleDbWorkFn = scheduleDbWorkFn;
}
-std::vector<BSONObj> CollectionCloner::getDocumentsToInsert_forTest() {
- LockGuard lk(_mutex);
- return _documentsToInsert;
-}
-
void CollectionCloner::_countCallback(
const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
@@ -403,181 +391,80 @@ void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus&
}
}
-void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) {
- if (!cbd.status.isOK()) {
- _finishCallback(cbd.status);
- return;
- }
- if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) {
- warning()
- << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:"
- << this->_sourceNss;
- }
+void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ if (!fetchResult.isOK()) {
+ // Wait for active inserts to complete.
+ waitForDbWorker();
- auto collectionBulkLoader = _storageInterface->createCollectionForBulkLoading(
- _destNss, _options, _idIndexSpec, _indexSpecs);
+ Status newStatus{fetchResult.getStatus().code(),
+ str::stream() << "While querying collection '" << _sourceNss.ns()
+ << "' there was an error '"
+ << fetchResult.getStatus().reason()
+ << "'"};
- if (!collectionBulkLoader.isOK()) {
- _finishCallback(collectionBulkLoader.getStatus());
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus);
return;
}
- _stats.indexes = _indexSpecs.size();
- if (!_idIndexSpec.isEmpty()) {
- ++_stats.indexes;
- }
-
- _collLoader = std::move(collectionBulkLoader.getValue());
-
- BSONObjBuilder cmdObj;
- EstablishCursorsCommand cursorCommand;
- // The 'find' command is used when the number of cloning cursors is 1 to ensure
- // the correctness of the collection cloning process until 'parallelCollectionScan'
- // can be tested more extensively in context of initial sync.
- if (_maxNumClonerCursors == 1) {
- cmdObj.append("find", _sourceNss.coll());
- cmdObj.append("noCursorTimeout", true);
- // Set batchSize to be 0 to establish the cursor without fetching any documents,
- // similar to the response format of 'parallelCollectionScan'.
- cmdObj.append("batchSize", 0);
- cursorCommand = Find;
- } else {
- cmdObj.append("parallelCollectionScan", _sourceNss.coll());
- cmdObj.append("numCursors", _maxNumClonerCursors);
- cursorCommand = ParallelCollScan;
+ auto batchData(fetchResult.getValue());
+ bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction;
+ if (batchData.documents.size() > 0) {
+ LockGuard lk(_mutex);
+ _documents.insert(_documents.end(), batchData.documents.begin(), batchData.documents.end());
+ } else if (!batchData.first) {
+ warning() << "No documents returned in batch; ns: " << _sourceNss
+ << ", cursorId:" << batchData.cursorId << ", isLastBatch:" << lastBatch;
}
- Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
-
- _establishCollectionCursorsScheduler = stdx::make_unique<RemoteCommandRetryScheduler>(
- _executor,
- RemoteCommandRequest(_source,
- _sourceNss.db().toString(),
- cmdObj.obj(),
- ReadPreferenceSetting::secondaryPreferredMetadata(),
- opCtx,
- RemoteCommandRequest::kNoTimeout),
- stdx::bind(&CollectionCloner::_establishCollectionCursorsCallback,
- this,
- stdx::placeholders::_1,
- cursorCommand),
- RemoteCommandRetryScheduler::makeRetryPolicy(
- numInitialSyncCollectionFindAttempts.load(),
- executor::RemoteCommandRequest::kNoTimeout,
- RemoteCommandRetryScheduler::kAllRetriableErrors));
- auto scheduleStatus = _establishCollectionCursorsScheduler->startup();
- LOG(1) << "Attempting to establish cursors with maxNumClonerCursors: " << _maxNumClonerCursors;
+ auto&& scheduleResult =
+ _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback,
+ this,
+ stdx::placeholders::_1,
+ lastBatch,
+ onCompletionGuard));
+ if (!scheduleResult.isOK()) {
+ Status newStatus{scheduleResult.getStatus().code(),
+ str::stream() << "While cloning collection '" << _sourceNss.ns()
+ << "' there was an error '"
+ << scheduleResult.getStatus().reason()
+ << "'"};
- if (!scheduleStatus.isOK()) {
- _establishCollectionCursorsScheduler.reset();
- _finishCallback(scheduleStatus);
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus);
return;
}
-}
-Status CollectionCloner::_parseCursorResponse(BSONObj response,
- std::vector<CursorResponse>* cursors,
- EstablishCursorsCommand cursorCommand) {
- switch (cursorCommand) {
- case Find: {
- StatusWith<CursorResponse> findResponse = CursorResponse::parseFromBSON(response);
- if (!findResponse.isOK()) {
- Status errorStatus{findResponse.getStatus().code(),
- str::stream()
- << "While parsing the 'find' query against collection '"
- << _sourceNss.ns()
- << "' there was an error '"
- << findResponse.getStatus().reason()
- << "'"};
- return errorStatus;
- }
- cursors->push_back(std::move(findResponse.getValue()));
- break;
- }
- case ParallelCollScan: {
- auto cursorElements = _parseParallelCollectionScanResponse(response);
- if (!cursorElements.isOK()) {
- return cursorElements.getStatus();
- }
- std::vector<BSONElement> cursorsArray;
- cursorsArray = cursorElements.getValue();
- // Parse each BSONElement into a 'CursorResponse' object.
- for (BSONElement cursor : cursorsArray) {
- if (!cursor.isABSONObj()) {
- Status errorStatus(
- ErrorCodes::FailedToParse,
- "The 'cursor' field in the list of cursor responses is not a "
- "valid BSON Object");
- return errorStatus;
- }
- const BSONObj cursorObj = cursor.Obj().getOwned();
- StatusWith<CursorResponse> parallelCollScanResponse =
- CursorResponse::parseFromBSON(cursorObj);
- if (!parallelCollScanResponse.isOK()) {
- return parallelCollScanResponse.getStatus();
- }
- cursors->push_back(std::move(parallelCollScanResponse.getValue()));
+ MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterInitialFind, nssData) {
+ const BSONObj& data = nssData.getData();
+ auto nss = data["nss"].str();
+ // Only hang when cloning the specified collection.
+ if (_destNss.toString() == nss) {
+ while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterInitialFind) &&
+ !_isShuttingDown()) {
+ log() << "initialSyncHangCollectionClonerAfterInitialFind fail point enabled for "
+ << nss << ". Blocking until fail point is disabled.";
+ mongo::sleepsecs(1);
}
- break;
- }
- default: {
- Status errorStatus(
- ErrorCodes::FailedToParse,
- "The command used to establish the collection cloner cursors is not valid.");
- return errorStatus;
}
}
- return Status::OK();
-}
-void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd,
- EstablishCursorsCommand cursorCommand) {
- if (_state == State::kShuttingDown) {
- Status shuttingDownStatus{ErrorCodes::CallbackCanceled, "Cloner shutting down."};
- _finishCallback(shuttingDownStatus);
- return;
- }
- auto response = rcbd.response;
- if (!response.isOK()) {
- _finishCallback(response.status);
- return;
- }
- Status commandStatus = getStatusFromCommandResult(response.data);
- if (!commandStatus.isOK()) {
- Status newStatus{commandStatus.code(),
- str::stream() << "While querying collection '" << _sourceNss.ns()
- << "' there was an error '"
- << commandStatus.reason()
- << "'"};
- _finishCallback(commandStatus);
- return;
+ if (!lastBatch) {
+ invariant(getMoreBob);
+ getMoreBob->append("getMore", batchData.cursorId);
+ getMoreBob->append("collection", batchData.nss.coll());
+ getMoreBob->append("batchSize", _batchSize);
}
+}
- std::vector<CursorResponse> cursorResponses;
- Status parseResponseStatus =
- _parseCursorResponse(response.data, &cursorResponses, cursorCommand);
- if (!parseResponseStatus.isOK()) {
- _finishCallback(parseResponseStatus);
+void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) {
+ if (!cbd.status.isOK()) {
+ _finishCallback(cbd.status);
return;
}
- LOG(1) << "Collection cloner running with " << cursorResponses.size()
- << " cursors established.";
-
- // Initialize the 'AsyncResultsMerger'(ARM).
- std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
- for (auto&& cursorResponse : cursorResponses) {
- // A placeholder 'ShardId' is used until the ARM is made less sharding specific.
- remoteCursors.emplace_back(
- ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse));
- }
-
- // An empty list of authenticated users is passed into the cluster parameters
- // as user information is not used in the ARM in context of collection cloning.
- _clusterClientCursorParams =
- stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator());
- _clusterClientCursorParams->remotes = std::move(remoteCursors);
- _arm = stdx::make_unique<AsyncResultsMerger>(_executor, _clusterClientCursorParams.get());
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
@@ -589,134 +476,50 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
// that will cause the destructor of the completion guard to run, the destructor must be run
// outside the mutex. This is a necessary condition to invoke _finishCallback.
stdx::lock_guard<stdx::mutex> lock(_mutex);
- Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
- if (!scheduleStatus.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
- return;
- }
-}
-
-StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionScanResponse(
- BSONObj resp) {
- if (!resp.hasField("cursors")) {
- return Status(ErrorCodes::CursorNotFound,
- "The 'parallelCollectionScan' response does not contain a 'cursors' field.");
- }
- BSONElement response = resp["cursors"];
- if (response.type() == BSONType::Array) {
- return response.Array();
- } else {
- return Status(
- ErrorCodes::FailedToParse,
- "The 'parallelCollectionScan' response is unable to be transformed into an array.");
- }
-}
-
-Status CollectionCloner::_bufferNextBatchFromArm() {
- while (_arm->ready()) {
- auto armResultStatus = _arm->nextReady();
- if (!armResultStatus.getStatus().isOK()) {
- return armResultStatus.getStatus();
- }
- if (armResultStatus.getValue().isEOF()) {
- // We have reached the end of the batch.
- break;
- } else {
- auto queryResult = armResultStatus.getValue().getResult();
- _documentsToInsert.push_back(std::move(*queryResult));
- }
+ if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) {
+ warning()
+ << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:"
+ << this->_sourceNss;
}
- return Status::OK();
-}
-
-Status CollectionCloner::_scheduleNextARMResultsCallback(
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- auto nextEvent = _arm->nextEvent(opCtx);
- if (!nextEvent.isOK()) {
- return nextEvent.getStatus();
- }
- auto event = nextEvent.getValue();
- auto handleARMResultsOnNextEvent =
- _executor->onEvent(event,
- stdx::bind(&CollectionCloner::_handleARMResultsCallback,
- this,
- stdx::placeholders::_1,
- onCompletionGuard));
- return handleARMResultsOnNextEvent.getStatus();
-}
-
-void CollectionCloner::_handleARMResultsCallback(
- const executor::TaskExecutor::CallbackArgs& cbd,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- auto setResultAndCancelRemainingWork = [this](std::shared_ptr<OnCompletionGuard> guard,
- Status status) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- guard->setResultAndCancelRemainingWork_inlock(lock, status);
- return;
- };
+ auto status = _storageInterface->createCollectionForBulkLoading(
+ _destNss, _options, _idIndexSpec, _indexSpecs);
- if (!cbd.status.isOK()) {
- // Wait for active inserts to complete.
- waitForDbWorker();
- Status newStatus{cbd.status.code(),
- str::stream() << "While querying collection '" << _sourceNss.ns()
- << "' there was an error '"
- << cbd.status.reason()
- << "'"};
- setResultAndCancelRemainingWork(onCompletionGuard, cbd.status);
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status.getStatus());
return;
}
- // Pull the documents from the ARM into a buffer until the entire batch has been processed.
- auto nextBatchStatus = _bufferNextBatchFromArm();
- if (!nextBatchStatus.isOK()) {
- setResultAndCancelRemainingWork(onCompletionGuard, nextBatchStatus);
- return;
+ _stats.indexes = _indexSpecs.size();
+ if (!_idIndexSpec.isEmpty()) {
+ ++_stats.indexes;
}
- bool lastBatch = _arm->remotesExhausted();
- auto&& scheduleResult =
- _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback,
- this,
- stdx::placeholders::_1,
- lastBatch,
- onCompletionGuard));
- if (!scheduleResult.isOK()) {
- Status newStatus{scheduleResult.getStatus().code(),
- str::stream() << "While cloning collection '" << _sourceNss.ns()
- << "' there was an error '"
- << scheduleResult.getStatus().reason()
- << "'"};
- setResultAndCancelRemainingWork(onCompletionGuard, scheduleResult.getStatus());
- return;
- }
+ _collLoader = std::move(status.getValue());
- MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) {
- const BSONObj& data = nssData.getData();
- auto nss = data["nss"].str();
- // Only hang when cloning the specified collection.
- if (_destNss.toString() == nss) {
- while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterHandlingBatchResponse) &&
- !_isShuttingDown()) {
- log() << "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point "
- "enabled for "
- << nss << ". Blocking until fail point is disabled.";
- mongo::sleepsecs(1);
- }
- }
- }
+ _findFetcher = stdx::make_unique<Fetcher>(
+ _executor,
+ _source,
+ _sourceNss.db().toString(),
+ BSON("find" << _sourceNss.coll() << "noCursorTimeout" << true << "batchSize" << _batchSize),
+ stdx::bind(&CollectionCloner::_findCallback,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3,
+ onCompletionGuard),
+ ReadPreferenceSetting::secondaryPreferredMetadata(),
+ RemoteCommandRequest::kNoTimeout,
+ RemoteCommandRetryScheduler::makeRetryPolicy(
+ numInitialSyncCollectionFindAttempts.load(),
+ executor::RemoteCommandRequest::kNoTimeout,
+ RemoteCommandRetryScheduler::kAllRetriableErrors));
- // If the remote cursors are not exhausted, schedule this callback again to handle
- // the impending cursor response.
- if (!lastBatch) {
- Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
- if (!scheduleStatus.isOK()) {
- setResultAndCancelRemainingWork(onCompletionGuard, scheduleStatus);
- return;
- }
+ Status scheduleStatus = _findFetcher->schedule();
+ if (!scheduleStatus.isOK()) {
+ _findFetcher.reset();
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
+ return;
}
}
@@ -730,21 +533,24 @@ void CollectionCloner::_insertDocumentsCallback(
return;
}
- UniqueLock lk(_mutex);
std::vector<BSONObj> docs;
- if (_documentsToInsert.size() == 0) {
+ UniqueLock lk(_mutex);
+ if (_documents.size() == 0) {
warning() << "_insertDocumentsCallback, but no documents to insert for ns:" << _destNss;
+
if (lastBatch) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
}
return;
}
- _documentsToInsert.swap(docs);
+
+ _documents.swap(docs);
_stats.documentsCopied += docs.size();
++_stats.fetchBatches;
_progressMeter.hit(int(docs.size()));
invariant(_collLoader);
const auto status = _collLoader->insertDocuments(docs.cbegin(), docs.cend());
+
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, status);
return;
@@ -764,10 +570,12 @@ void CollectionCloner::_insertDocumentsCallback(
}
}
- if (lastBatch) {
- // Clean up resources once the last batch has been copied over and set the status to OK.
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
+ if (!lastBatch) {
+ return;
}
+
+ // Done with last batch and time to set result in completion guard to Status::OK().
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
}
void CollectionCloner::_finishCallback(const Status& status) {
@@ -776,6 +584,7 @@ void CollectionCloner::_finishCallback(const Status& status) {
// Copy the status so we can change it below if needed.
auto finalStatus = status;
bool callCollectionLoader = false;
+
decltype(_onCompletion) onCompletion;
{
LockGuard lk(_mutex);
@@ -786,6 +595,7 @@ void CollectionCloner::_finishCallback(const Status& status) {
invariant(_onCompletion);
std::swap(_onCompletion, onCompletion);
}
+
if (callCollectionLoader) {
if (finalStatus.isOK()) {
const auto loaderStatus = _collLoader->commit();
@@ -799,6 +609,7 @@ void CollectionCloner::_finishCallback(const Status& status) {
// This will release the resources held by the loader.
_collLoader.reset();
}
+
onCompletion(finalStatus);
// This will release the resources held by the callback function object. '_onCompletion' is
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 222886b9d72..af029b0f1cd 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -69,7 +69,6 @@ public:
/**
* Callback completion guard for CollectionCloner.
*/
- using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
using OnCompletionGuard = CallbackCompletionGuard<Status>;
struct Stats {
@@ -112,8 +111,7 @@ public:
const CollectionOptions& options,
const CallbackFn& onCompletion,
StorageInterface* storageInterface,
- const int batchSize,
- const int maxNumClonerCursors);
+ const int batchSize);
virtual ~CollectionCloner();
@@ -148,14 +146,6 @@ public:
*/
void setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& scheduleDbWorkFn);
- /**
- * Returns the documents currently stored in the '_documents' buffer that is intended
- * to be inserted through the collection loader.
- *
- * For testing only.
- */
- std::vector<BSONObj> getDocumentsToInsert_forTest();
-
private:
bool _isActive_inlock() const;
@@ -183,6 +173,14 @@ private:
BSONObjBuilder* getMoreBob);
/**
+ * Read collection documents from find result.
+ */
+ void _findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard);
+
+ /**
* Request storage interface to create collection.
*
* Called multiple times if there are more than one batch of responses from listIndexes
@@ -194,59 +192,13 @@ private:
void _beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& callbackData);
/**
- * The possible command types that can be used to establish the initial cursors on the
- * remote collection.
- */
- enum EstablishCursorsCommand { Find, ParallelCollScan };
-
- /**
- * Parses the cursor responses from the 'find' or 'parallelCollectionScan' command
- * and passes them into the 'AsyncResultsMerger'.
- */
- void _establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd,
- EstablishCursorsCommand cursorCommand);
-
- /**
- * Parses the response from a 'parallelCollectionScan' command into a vector of cursor
- * elements.
- */
- StatusWith<std::vector<BSONElement>> _parseParallelCollectionScanResponse(BSONObj resp);
-
- /**
- * Takes a cursors buffer and parses the 'parallelCollectionScan' response into cursor
- * responses that are pushed onto the buffer.
- */
- Status _parseCursorResponse(BSONObj response,
- std::vector<CursorResponse>* cursors,
- EstablishCursorsCommand cursorCommand);
-
- /**
- * Calls to get the next event from the 'AsyncResultsMerger'. This schedules
- * '_handleAsyncResultsCallback' to be run when the event is signaled successfully.
- */
- Status _scheduleNextARMResultsCallback(std::shared_ptr<OnCompletionGuard> onCompletionGuard);
-
- /**
- * Runs for each time a new batch of documents can be retrieved from the 'AsyncResultsMerger'.
- * Buffers the documents retrieved for insertion and schedules a '_insertDocumentsCallback'
- * to insert the contents of the buffer.
- */
- void _handleARMResultsCallback(const executor::TaskExecutor::CallbackArgs& cbd,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard);
-
- /**
- * Pull all ready results from the ARM into a buffer to be inserted.
- */
- Status _bufferNextBatchFromArm();
-
- /**
- * Called whenever there is a new batch of documents ready from the 'AsyncResultsMerger'.
+ * Called multiple times if there are more than one batch of documents from the fetcher.
* On the last batch, 'lastBatch' will be true.
*
* Each document returned will be inserted via the storage interfaceRequest storage
* interface.
*/
- void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd,
+ void _insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& callbackData,
bool lastBatch,
std::shared_ptr<OnCompletionGuard> onCompletionGuard);
@@ -279,30 +231,18 @@ private:
StorageInterface* _storageInterface; // (R) Not owned by us.
RemoteCommandRetryScheduler _countScheduler; // (S)
Fetcher _listIndexesFetcher; // (S)
+ std::unique_ptr<Fetcher> _findFetcher; // (M)
std::vector<BSONObj> _indexSpecs; // (M)
BSONObj _idIndexSpec; // (M)
- std::vector<BSONObj>
- _documentsToInsert; // (M) Documents read from 'AsyncResultsMerger' to insert.
- TaskRunner _dbWorkTaskRunner; // (R)
+ std::vector<BSONObj> _documents; // (M) Documents read from fetcher to insert.
+ TaskRunner _dbWorkTaskRunner; // (R)
ScheduleDbWorkFn
_scheduleDbWorkFn; // (RT) Function for scheduling database work using the executor.
Stats _stats; // (M) stats for this instance.
ProgressMeter _progressMeter; // (M) progress meter for this instance.
- const int _collectionCloningBatchSize; // (R) The size of the batches of documents returned in
- // collection cloning.
-
- // (R) The maximum number of cursors to use in the collection cloning process.
- const int _maxNumClonerCursors;
- // (M) Component responsible for fetching the documents from the collection cloner cursor(s).
- std::unique_ptr<AsyncResultsMerger> _arm;
- // (R) The cursor parameters used by the 'AsyncResultsMerger'.
- std::unique_ptr<ClusterClientCursorParams> _clusterClientCursorParams;
-
- // (M) The event handle for the 'kill' event of the 'AsyncResultsMerger'.
- executor::TaskExecutor::EventHandle _killArmHandle;
+ const int _batchSize;
- // (M) Scheduler used to establish the initial cursor or set of cursors.
- std::unique_ptr<RemoteCommandRetryScheduler> _establishCollectionCursorsScheduler;
+ AsyncResultsMerger _arm;
// State transitions:
// PreStart --> Running --> ShuttingDown --> Complete
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index d3e8966a304..4fb7a7fe208 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -66,12 +66,8 @@ protected:
void setUp() override;
void tearDown() override;
- // A simple arbitrary value to use as the default batch size.
- const int defaultBatchSize = 1024;
-
- // Running initial sync with a single cursor will default to using the 'find' command until
- // 'parallelCollectionScan' has more complete testing.
- const int defaultNumCloningCursors = 1;
+ // 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use.
+ const int defaultBatchSize = (16 * 1024 * 1024) / 12 * 10;
CollectionOptions options;
std::unique_ptr<CollectionCloner> collectionCloner;
@@ -91,8 +87,7 @@ void CollectionClonerTest::setUp() {
options,
stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
storageInterface.get(),
- defaultBatchSize,
- defaultNumCloningCursors);
+ defaultBatchSize);
collectionStats = CollectionMockStats();
storageInterface->createCollectionForBulkFn =
[this](const NamespaceString& nss,
@@ -119,7 +114,6 @@ BaseCloner* CollectionClonerTest::getCloner() const {
return collectionCloner.get();
}
-
TEST_F(CollectionClonerTest, InvalidConstruction) {
executor::TaskExecutor& executor = getExecutor();
auto pool = dbWorkThreadPool.get();
@@ -129,50 +123,29 @@ TEST_F(CollectionClonerTest, InvalidConstruction) {
// Null executor -- error from Fetcher, not CollectionCloner.
{
StorageInterface* si = storageInterface.get();
- ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(nullptr,
- pool,
- target,
- nss,
- options,
- cb,
- si,
- defaultBatchSize,
- defaultNumCloningCursors),
- UserException,
- ErrorCodes::BadValue,
- "task executor cannot be null");
+ ASSERT_THROWS_CODE_AND_WHAT(
+ CollectionCloner(nullptr, pool, target, nss, options, cb, si, defaultBatchSize),
+ UserException,
+ ErrorCodes::BadValue,
+ "task executor cannot be null");
}
// Null storage interface
- ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor,
- pool,
- target,
- nss,
- options,
- cb,
- nullptr,
- defaultBatchSize,
- defaultNumCloningCursors),
- UserException,
- ErrorCodes::BadValue,
- "storage interface cannot be null");
+ ASSERT_THROWS_CODE_AND_WHAT(
+ CollectionCloner(&executor, pool, target, nss, options, cb, nullptr, defaultBatchSize),
+ UserException,
+ ErrorCodes::BadValue,
+ "storage interface cannot be null");
// Invalid namespace.
{
NamespaceString badNss("db.");
StorageInterface* si = storageInterface.get();
- ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor,
- pool,
- target,
- badNss,
- options,
- cb,
- si,
- defaultBatchSize,
- defaultNumCloningCursors),
- UserException,
- ErrorCodes::BadValue,
- "invalid collection namespace: db.");
+ ASSERT_THROWS_CODE_AND_WHAT(
+ CollectionCloner(&executor, pool, target, badNss, options, cb, si, defaultBatchSize),
+ UserException,
+ ErrorCodes::BadValue,
+ "invalid collection namespace: db.");
}
// Invalid collection options - error from CollectionOptions::validate(), not CollectionCloner.
@@ -182,15 +155,8 @@ TEST_F(CollectionClonerTest, InvalidConstruction) {
<< "not a document");
StorageInterface* si = storageInterface.get();
ASSERT_THROWS_CODE_AND_WHAT(
- CollectionCloner(&executor,
- pool,
- target,
- nss,
- invalidOptions,
- cb,
- si,
- defaultBatchSize,
- defaultNumCloningCursors),
+ CollectionCloner(
+ &executor, pool, target, nss, invalidOptions, cb, si, defaultBatchSize),
UserException,
ErrorCodes::BadValue,
"'storageEngine.storageEngine1' has to be an embedded document.");
@@ -200,18 +166,11 @@ TEST_F(CollectionClonerTest, InvalidConstruction) {
{
CollectionCloner::CallbackFn nullCb;
StorageInterface* si = storageInterface.get();
- ASSERT_THROWS_CODE_AND_WHAT(CollectionCloner(&executor,
- pool,
- target,
- nss,
- options,
- nullCb,
- si,
- defaultBatchSize,
- defaultNumCloningCursors),
- UserException,
- ErrorCodes::BadValue,
- "callback function cannot be null");
+ ASSERT_THROWS_CODE_AND_WHAT(
+ CollectionCloner(&executor, pool, target, nss, options, nullCb, si, defaultBatchSize),
+ UserException,
+ ErrorCodes::BadValue,
+ "callback function cannot be null");
}
}
@@ -357,8 +316,7 @@ TEST_F(CollectionClonerTest,
options,
stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
storageInterface.get(),
- defaultBatchSize,
- defaultNumCloningCursors);
+ defaultBatchSize);
ASSERT_OK(collectionCloner->startup());
@@ -381,8 +339,7 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
options,
stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
storageInterface.get(),
- defaultBatchSize,
- defaultNumCloningCursors));
+ defaultBatchSize));
NamespaceString collNss;
CollectionOptions collOptions;
@@ -413,19 +370,10 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
}
collectionCloner->join();
ASSERT_EQUALS(1, collectionStats.insertCount);
@@ -751,7 +699,7 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) {
ASSERT_FALSE(net->hasReadyRequests());
}
-TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) {
+TEST_F(CollectionClonerTest, FindCommandFailed) {
ASSERT_OK(collectionCloner->startup());
{
@@ -799,15 +747,13 @@ TEST_F(CollectionClonerTest, CollectionClonerResendsFindCommandOnRetriableError)
net->runReadyNetworkOperations();
ASSERT_TRUE(collectionCloner->isActive());
- // This check exists to ensure that the command used to establish the cursors is retried,
- // regardless of the command format. Therefore, it shouldn't be necessary to have a separate
- // similar test case for the 'parallelCollectionScan' command.
+ // Confirm that CollectionCloner resends the find request.
auto noi = net->getNextReadyRequest();
assertRemoteCommandNameEquals("find", noi->getRequest());
net->blackHole(noi);
}
-TEST_F(CollectionClonerTest, EstablishCursorCommandCanceled) {
+TEST_F(CollectionClonerTest, FindCommandCanceled) {
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
@@ -864,19 +810,10 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) {
return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
});
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
}
ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
@@ -907,18 +844,9 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
});
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1))));
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(BSON("_id" << 1))));
}
collectionCloner->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
@@ -947,18 +875,9 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
return Status(ErrorCodes::OperationFailed, "");
};
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1))));
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(BSON("_id" << 1))));
}
collectionCloner->join();
@@ -983,19 +902,10 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
}
collectionCloner->join();
@@ -1023,15 +933,6 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
@@ -1049,7 +950,7 @@ TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
const BSONObj doc2 = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2)));
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch"));
}
collectionCloner->join();
@@ -1077,15 +978,6 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
@@ -1110,9 +1002,10 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
ASSERT_TRUE(collectionCloner->isActive());
+ BSONArray emptyArray;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(emptyArray));
+ processNetworkResponse(createCursorResponse(0, emptyArray, "nextBatch"));
}
collectionCloner->join();
@@ -1138,15 +1031,6 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
@@ -1159,6 +1043,7 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
ASSERT_TRUE(collectionCloner->isActive());
+ BSONArray emptyArray;
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch"));
@@ -1173,7 +1058,7 @@ TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) {
const BSONObj doc2 = BSON("_id" << 2);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2)));
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch"));
}
collectionCloner->join();
@@ -1212,15 +1097,6 @@ TEST_F(CollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailu
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(createCursorResponse(1, BSON_ARRAY(BSON("_id" << 1))));
@@ -1280,8 +1156,7 @@ TEST_F(CollectionClonerTest, CollectionClonerResetsOnCompletionCallbackFunctionA
result = status;
},
storageInterface.get(),
- defaultBatchSize,
- defaultNumCloningCursors);
+ defaultBatchSize);
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
@@ -1327,620 +1202,7 @@ TEST_F(CollectionClonerTest,
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- // At this point, the CollectionCloner has sent the find request to establish the cursor.
- // We want to return the first batch of documents for the collection from the network so that
- // the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for
- // the next batch of documents.
-
- // Store the scheduled CollectionCloner::_insertDocuments task but do not run it yet.
- executor::TaskExecutor::CallbackFn insertDocumentsFn;
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- insertDocumentsFn = workFn;
- executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>());
- return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
- });
- ASSERT_FALSE(insertDocumentsFn);
-
- // Return first batch of collection documents from remote server for the getMore request.
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
-
- assertRemoteCommandNameEquals(
- "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc))));
- net->runReadyNetworkOperations();
- }
-
- // Confirm that CollectionCloner attempted to schedule _insertDocuments task.
- ASSERT_TRUE(insertDocumentsFn);
-
- // Return an error for the getMore request for the next batch of collection documents.
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
-
- assertRemoteCommandNameEquals(
- "getMore",
- net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "getMore failed")));
- net->runReadyNetworkOperations();
- }
-
- // CollectionCloner should still be active because we have not finished processing the
- // insertDocuments task.
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
-
- // Run the insertDocuments task. The final status of the CollectionCloner should match the first
- // error passed to the completion guard (ie. from the failed getMore request).
- executor::TaskExecutor::CallbackArgs callbackArgs(
- &getExecutor(), {}, Status(ErrorCodes::CallbackCanceled, ""));
- insertDocumentsFn(callbackArgs);
-
- // Reset 'insertDocumentsFn' to release last reference count on completion guard.
- insertDocumentsFn = {};
-
- // No need to call CollectionCloner::join() because we invoked the _insertDocuments callback
- // synchronously.
-
- ASSERT_FALSE(collectionCloner->isActive());
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
-}
-
-class ParallelCollectionClonerTest : public BaseClonerTest {
-public:
- BaseCloner* getCloner() const override;
-
-protected:
- void setUp() override;
- void tearDown() override;
- std::vector<BSONObj> generateDocs(std::size_t numDocs);
-
- // A simple arbitrary value to use as the default batch size.
- const int defaultBatchSize = 1024;
-
- // Running initial sync with a single cursor will default to using the 'find' command until
- // 'parallelCollectionScan' has more complete testing.
- const int defaultNumCloningCursors = 3;
-
- CollectionOptions options;
- std::unique_ptr<CollectionCloner> collectionCloner;
- CollectionMockStats collectionStats; // Used by the _loader.
- CollectionBulkLoaderMock* _loader; // Owned by CollectionCloner.
-};
-
-void ParallelCollectionClonerTest::setUp() {
- BaseClonerTest::setUp();
- options = {};
- collectionCloner.reset(nullptr);
- collectionCloner = stdx::make_unique<CollectionCloner>(
- &getExecutor(),
- dbWorkThreadPool.get(),
- target,
- nss,
- options,
- stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
- storageInterface.get(),
- defaultBatchSize,
- defaultNumCloningCursors);
- collectionStats = CollectionMockStats();
- storageInterface->createCollectionForBulkFn =
- [this](const NamespaceString& nss,
- const CollectionOptions& options,
- const BSONObj idIndexSpec,
- const std::vector<BSONObj>& secondaryIndexSpecs) {
- _loader = new CollectionBulkLoaderMock(&collectionStats);
- Status initCollectionBulkLoader = _loader->init(secondaryIndexSpecs);
- ASSERT_OK(initCollectionBulkLoader);
-
- return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
- std::unique_ptr<CollectionBulkLoader>(_loader));
- };
-}
-
-void ParallelCollectionClonerTest::tearDown() {
- BaseClonerTest::tearDown();
- // Executor may still invoke collection cloner's callback before shutting down.
- collectionCloner.reset(nullptr);
- options = {};
-}
-
-BaseCloner* ParallelCollectionClonerTest::getCloner() const {
- return collectionCloner.get();
-}
-
-std::vector<BSONObj> ParallelCollectionClonerTest::generateDocs(std::size_t numDocs) {
- std::vector<BSONObj> docs;
- for (unsigned int i = 0; i < numDocs; i++) {
- docs.push_back(BSON("_id" << i));
- }
- return docs;
-}
-
-TEST_F(ParallelCollectionClonerTest, InsertDocumentsSingleBatchWithMultipleCloningCursors) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- // A single cursor response is returned because there is only a single document to insert.
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(
- BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)) << "ok" << 1));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- auto exec = &getExecutor();
- std::vector<BSONObj> docs;
- // Record the buffered documents before they are inserted so we can
- // validate them.
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- docs = collectionCloner->getDocumentsToInsert_forTest();
- return exec->scheduleWork(workFn);
- });
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
- }
-
- collectionCloner->join();
-
- ASSERT_BSONOBJ_EQ(docs[0], doc);
- ASSERT_EQUALS(1, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
-TEST_F(ParallelCollectionClonerTest,
- InsertDocumentsSingleBatchOfMultipleDocumentsWithMultipleCloningCursors) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- // A single cursor response is returned because there is only a single batch of documents to
- // insert.
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(
- BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)) << "ok" << 1));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- auto exec = &getExecutor();
- std::vector<BSONObj> docs;
- // Record the buffered documents before they are inserted so we can
- // validate them.
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- docs = collectionCloner->getDocumentsToInsert_forTest();
- return exec->scheduleWork(workFn);
- });
-
- auto generatedDocs = generateDocs(3);
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(
- BSON_ARRAY(generatedDocs[0] << generatedDocs[1] << generatedDocs[2])));
- }
-
- collectionCloner->join();
-
- ASSERT_EQUALS(3U, docs.size());
- for (int i = 0; i < 3; i++) {
- ASSERT_BSONOBJ_EQ(docs[i], generatedDocs[i]);
- }
- ASSERT_EQUALS(3, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
-TEST_F(ParallelCollectionClonerTest, InsertDocumentsWithMultipleCursorsOfDifferentNumberOfBatches) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
- << createCursorResponse(2, emptyArray)
- << createCursorResponse(3, emptyArray))
- << "ok"
- << 1));
- }
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- auto exec = &getExecutor();
- std::vector<BSONObj> docs;
-
- // Record the buffered documents before they are inserted so we can
- // validate them.
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- auto buffered = collectionCloner->getDocumentsToInsert_forTest();
- docs.insert(docs.end(), buffered.begin(), buffered.end());
- return exec->scheduleWork(workFn);
- });
-
- int numDocs = 9;
- std::vector<BSONObj> generatedDocs = generateDocs(numDocs);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]), "nextBatch"));
- processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]), "nextBatch"));
- processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]), "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3U, docs.size());
- for (int i = 0; i < 3; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
- ASSERT_EQUALS(3, collectionStats.insertCount);
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[3]), "nextBatch"));
- processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[4]), "nextBatch"));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5])));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(6U, docs.size());
- for (int i = 3; i < 6; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
- ASSERT_EQUALS(6, collectionStats.insertCount);
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[6]), "nextBatch"));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[7])));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(8U, docs.size());
- for (int i = 6; i < 8; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
- ASSERT_EQUALS(8, collectionStats.insertCount);
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[8])));
- }
-
- collectionCloner->join();
- ASSERT_EQUALS(9U, docs.size());
- ASSERT_BSONOBJ_EQ(generatedDocs[8], docs[8]);
- ASSERT_EQUALS(numDocs, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
-TEST_F(ParallelCollectionClonerTest, MiddleBatchContainsNoDocumentsWithMultipleCursors) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
- << createCursorResponse(2, emptyArray)
- << createCursorResponse(3, emptyArray))
- << "ok"
- << 1));
- }
- collectionCloner->waitForDbWorker();
-
- auto exec = &getExecutor();
- std::vector<BSONObj> docs;
- // Record the buffered documents before they are inserted so we can
- // validate them.
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- auto buffered = collectionCloner->getDocumentsToInsert_forTest();
- docs.insert(docs.end(), buffered.begin(), buffered.end());
- return exec->scheduleWork(workFn);
- });
-
- ASSERT_TRUE(collectionCloner->isActive());
-
- int numDocs = 6;
- std::vector<BSONObj> generatedDocs = generateDocs(numDocs);
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0])));
- processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1])));
- processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2])));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3U, docs.size());
- for (int i = 0; i < 3; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
- ASSERT_EQUALS(3, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch"));
- processNetworkResponse(createCursorResponse(2, emptyArray, "nextBatch"));
- processNetworkResponse(createCursorResponse(3, emptyArray, "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[3])));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[4])));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5])));
- }
-
- collectionCloner->join();
-
- ASSERT_EQUALS(6U, docs.size());
- for (int i = 3; i < 6; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
-
- ASSERT_EQUALS(numDocs, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
-TEST_F(ParallelCollectionClonerTest, LastBatchContainsNoDocumentsWithMultipleCursors) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
- << createCursorResponse(2, emptyArray)
- << createCursorResponse(3, emptyArray))
- << "ok"
- << 1));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- auto exec = &getExecutor();
- std::vector<BSONObj> docs;
- // Record the buffered documents before they are inserted so we can
- // validate them.
- collectionCloner->setScheduleDbWorkFn_forTest(
- [&](const executor::TaskExecutor::CallbackFn& workFn) {
- auto buffered = collectionCloner->getDocumentsToInsert_forTest();
- docs.insert(docs.end(), buffered.begin(), buffered.end());
- return exec->scheduleWork(workFn);
- });
-
- int numDocs = 6;
- std::vector<BSONObj> generatedDocs = generateDocs(numDocs);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]), "nextBatch"));
- processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]), "nextBatch"));
- processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]), "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3U, docs.size());
- for (int i = 0; i < 3; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
- ASSERT_EQUALS(3, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[3]), "nextBatch"));
- processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[4]), "nextBatch"));
- processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[5]), "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(6U, docs.size());
- for (int i = 3; i < 6; i++) {
- ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]);
- }
- ASSERT_EQUALS(numDocs, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(emptyArray));
- processNetworkResponse(createFinalCursorResponse(emptyArray));
- processNetworkResponse(createFinalCursorResponse(emptyArray));
- }
-
- collectionCloner->join();
- ASSERT_EQUALS(6, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
-TEST_F(ParallelCollectionClonerTest, InsertDocumentsScheduleDbWorkFailedWithMultipleCursors) {
- ASSERT_OK(collectionCloner->startup());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
-
- collectionCloner->waitForDbWorker();
-
- // Replace scheduleDbWork function so that cloner will fail to schedule DB work after
- // getting documents.
- collectionCloner->setScheduleDbWorkFn_forTest(
- [](const executor::TaskExecutor::CallbackFn& workFn) {
- return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
- });
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
- << createCursorResponse(2, emptyArray)
- << createCursorResponse(3, emptyArray))
- << "ok"
- << 1));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
- }
-
- ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
-TEST_F(ParallelCollectionClonerTest,
- CollectionClonerWaitsForPendingTasksToCompleteBeforeInvokingOnCompletionCallback) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- auto net = getNet();
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(net);
-
- assertRemoteCommandNameEquals("count",
- net->scheduleSuccessfulResponse(createCountResponse(0)));
- net->runReadyNetworkOperations();
-
- assertRemoteCommandNameEquals(
- "listIndexes",
- net->scheduleSuccessfulResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))));
- net->runReadyNetworkOperations();
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
- << createCursorResponse(2, emptyArray)
- << createCursorResponse(3, emptyArray))
- << "ok"
- << 1));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- // At this point, the CollectionCloner has sent the find request to establish the cursor.
+ // At this point, the CollectionCloner has sent the find request for the collection documents.
// We want to return the first batch of documents for the collection from the network so that
// the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for
// the next batch of documents.
@@ -1955,13 +1217,13 @@ TEST_F(ParallelCollectionClonerTest,
});
ASSERT_FALSE(insertDocumentsFn);
- // Return first batch of collection documents from remote server for the getMore request.
+ // Return first batch of collection documents from remote server for the find request.
const BSONObj doc = BSON("_id" << 1);
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
assertRemoteCommandNameEquals(
- "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc))));
+ "find", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc))));
net->runReadyNetworkOperations();
}
@@ -1999,72 +1261,4 @@ TEST_F(ParallelCollectionClonerTest,
ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
}
-TEST_F(ParallelCollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailure) {
- // First cloning attempt - fails while reading documents from source collection.
- unittest::log() << "Starting first collection cloning attempt";
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray)
- << createCursorResponse(2, emptyArray)
- << createCursorResponse(3, emptyArray))
- << "ok"
- << 1));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- int numDocs = 5;
- std::vector<BSONObj> docs = generateDocs(numDocs);
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(docs[0])));
- processNetworkResponse(createCursorResponse(2, BSON_ARRAY(docs[1])));
- processNetworkResponse(createCursorResponse(3, BSON_ARRAY(docs[2])));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(3, collectionStats.insertCount);
-
- // Check that the status hasn't changed from the initial value.
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(docs[3])));
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(docs[4])));
- processNetworkResponse(ErrorCodes::OperationFailed,
- "failed to read remaining documents from source collection");
- }
-
- collectionCloner->join();
- ASSERT_EQUALS(numDocs, collectionStats.insertCount);
-
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-
- // Second cloning attempt - run to completion.
- unittest::log() << "Starting second collection cloning attempt - startup() should fail";
- collectionStats = CollectionMockStats();
- setStatus(getDetectableErrorStatus());
-
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, collectionCloner->startup());
-}
-
} // namespace
diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp
index bc7ea109918..0ebaf66e098 100644
--- a/src/mongo/db/repl/database_cloner.cpp
+++ b/src/mongo/db/repl/database_cloner.cpp
@@ -68,9 +68,6 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, defaultBat
// The number of attempts for the listCollections commands.
MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListCollectionsAttempts, int, 3);
-// The number of cursors to use in the collection cloning process.
-MONGO_EXPORT_SERVER_PARAMETER(maxNumInitialSyncCollectionClonerCursors, int, 1);
-
/**
* Default listCollections predicate.
*/
@@ -367,8 +364,7 @@ void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::QueryRes
stdx::bind(
&DatabaseCloner::_collectionClonerCallback, this, stdx::placeholders::_1, nss),
_storageInterface,
- collectionClonerBatchSize,
- maxNumInitialSyncCollectionClonerCursors.load());
+ collectionClonerBatchSize);
} catch (const UserException& ex) {
_finishCallback_inlock(lk, ex.toStatus());
return;
diff --git a/src/mongo/db/s/sharding_task_executor.cpp b/src/mongo/db/s/sharding_task_executor.cpp
index 0769a42ed40..24fc512459b 100644
--- a/src/mongo/db/s/sharding_task_executor.cpp
+++ b/src/mongo/db/s/sharding_task_executor.cpp
@@ -38,11 +38,8 @@
#include "mongo/db/logical_time.h"
#include "mongo/db/operation_time_tracker.h"
#include "mongo/executor/thread_pool_task_executor.h"
-#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/sharding_metadata.h"
-#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_last_error_info.h"
-#include "mongo/s/grid.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -119,17 +116,10 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCom
const TaskExecutor::RemoteCommandCallbackArgs& args) {
ON_BLOCK_EXIT([&cb, &args]() { cb(args); });
- auto shard = grid.shardRegistry()->getShardForHostNoReload(request.target);
- if (!shard) {
- LOG(1) << "Could not find shard containing host: " << request.target.toString();
- return;
- }
if (!args.response.isOK()) {
- shard->updateReplSetMonitor(request.target, args.response.status);
LOG(1) << "Error processing the remote request, not updating operationTime or gLE";
return;
}
- shard->updateReplSetMonitor(request.target, getStatusFromCommandResult(args.response.data));
// Update the logical clock.
invariant(timeTracker);