summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_cloner.cpp
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2017-08-11 17:09:56 -0400
committerWilliam Schultz <william.schultz@mongodb.com>2017-08-11 17:09:56 -0400
commit8ddfeab4644aa38082b3cd03cfeef1dd6c65d35a (patch)
tree0be30537fbd87401cf57ac1cd3d414884eaab2dd /src/mongo/db/repl/collection_cloner.cpp
parent0d3137df3879e86d92904309e968f25529904639 (diff)
downloadmongo-8ddfeab4644aa38082b3cd03cfeef1dd6c65d35a.tar.gz
Revert "SERVER-29617 replace fetcher with ARM and add numCursors server parameter"
This reverts commit 0d3137df3879e86d92904309e968f25529904639.
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp417
1 files changed, 114 insertions, 303 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 98d658b49f9..13b27d0ed01 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -74,9 +74,9 @@ MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncCollectionFindAttempts, int, 3);
// collection 'namespace'.
MONGO_FP_DECLARE(initialSyncHangDuringCollectionClone);
-// Failpoint which causes initial sync to hang after handling the next batch of results from the
-// 'AsyncResultsMerger' for a specific collection.
-MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterHandlingBatchResponse);
+// Failpoint which causes initial sync to hang after the initial 'find' command of collection
+// cloning, for a specific collection.
+MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterInitialFind);
CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
OldThreadPool* dbWorkThreadPool,
@@ -85,8 +85,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
const CollectionOptions& options,
const CallbackFn& onCompletion,
StorageInterface* storageInterface,
- const int batchSize,
- const int maxNumClonerCursors)
+ const int batchSize)
: _executor(executor),
_dbWorkThreadPool(dbWorkThreadPool),
_source(source),
@@ -123,7 +122,7 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
executor::RemoteCommandRequest::kNoTimeout,
RemoteCommandRetryScheduler::kAllRetriableErrors)),
_indexSpecs(),
- _documentsToInsert(),
+ _documents(),
_dbWorkTaskRunner(_dbWorkThreadPool),
_scheduleDbWorkFn([this](const executor::TaskExecutor::CallbackFn& work) {
auto task = [work](OperationContext* opCtx,
@@ -139,8 +138,9 @@ CollectionCloner::CollectionCloner(executor::TaskExecutor* executor,
kProgressMeterCheckInterval,
"documents copied",
str::stream() << _sourceNss.toString() << " collection clone progress"),
- _collectionCloningBatchSize(batchSize),
- _maxNumClonerCursors(maxNumClonerCursors) {
+ _batchSize(batchSize),
+ _arm(executor,
+ stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()).get()) {
// Fetcher throws an exception on null executor.
invariant(executor);
uassert(ErrorCodes::BadValue,
@@ -215,19 +215,15 @@ void CollectionCloner::shutdown() {
// Nothing to do if we are already in ShuttingDown or Complete state.
return;
}
+
_cancelRemainingWork_inlock();
}
void CollectionCloner::_cancelRemainingWork_inlock() {
- if (_arm) {
- Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- _killArmHandle = _arm->kill(opCtx);
- }
_countScheduler.shutdown();
_listIndexesFetcher.shutdown();
- if (_establishCollectionCursorsScheduler) {
- _establishCollectionCursorsScheduler->shutdown();
+ if (_findFetcher) {
+ _findFetcher->shutdown();
}
_dbWorkTaskRunner.cancel();
}
@@ -239,9 +235,6 @@ CollectionCloner::Stats CollectionCloner::getStats() const {
void CollectionCloner::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_killArmHandle) {
- _executor->waitForEvent(_killArmHandle);
- }
_condition.wait(lk, [this]() { return !_isActive_inlock(); });
}
@@ -257,11 +250,6 @@ void CollectionCloner::setScheduleDbWorkFn_forTest(const ScheduleDbWorkFn& sched
_scheduleDbWorkFn = scheduleDbWorkFn;
}
-std::vector<BSONObj> CollectionCloner::getDocumentsToInsert_forTest() {
- LockGuard lk(_mutex);
- return _documentsToInsert;
-}
-
void CollectionCloner::_countCallback(
const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
@@ -403,181 +391,80 @@ void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus&
}
}
-void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) {
- if (!cbd.status.isOK()) {
- _finishCallback(cbd.status);
- return;
- }
- if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) {
- warning()
- << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:"
- << this->_sourceNss;
- }
+void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ if (!fetchResult.isOK()) {
+ // Wait for active inserts to complete.
+ waitForDbWorker();
- auto collectionBulkLoader = _storageInterface->createCollectionForBulkLoading(
- _destNss, _options, _idIndexSpec, _indexSpecs);
+ Status newStatus{fetchResult.getStatus().code(),
+ str::stream() << "While querying collection '" << _sourceNss.ns()
+ << "' there was an error '"
+ << fetchResult.getStatus().reason()
+ << "'"};
- if (!collectionBulkLoader.isOK()) {
- _finishCallback(collectionBulkLoader.getStatus());
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus);
return;
}
- _stats.indexes = _indexSpecs.size();
- if (!_idIndexSpec.isEmpty()) {
- ++_stats.indexes;
- }
-
- _collLoader = std::move(collectionBulkLoader.getValue());
-
- BSONObjBuilder cmdObj;
- EstablishCursorsCommand cursorCommand;
- // The 'find' command is used when the number of cloning cursors is 1 to ensure
- // the correctness of the collection cloning process until 'parallelCollectionScan'
- // can be tested more extensively in context of initial sync.
- if (_maxNumClonerCursors == 1) {
- cmdObj.append("find", _sourceNss.coll());
- cmdObj.append("noCursorTimeout", true);
- // Set batchSize to be 0 to establish the cursor without fetching any documents,
- // similar to the response format of 'parallelCollectionScan'.
- cmdObj.append("batchSize", 0);
- cursorCommand = Find;
- } else {
- cmdObj.append("parallelCollectionScan", _sourceNss.coll());
- cmdObj.append("numCursors", _maxNumClonerCursors);
- cursorCommand = ParallelCollScan;
+ auto batchData(fetchResult.getValue());
+ bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction;
+ if (batchData.documents.size() > 0) {
+ LockGuard lk(_mutex);
+ _documents.insert(_documents.end(), batchData.documents.begin(), batchData.documents.end());
+ } else if (!batchData.first) {
+ warning() << "No documents returned in batch; ns: " << _sourceNss
+ << ", cursorId:" << batchData.cursorId << ", isLastBatch:" << lastBatch;
}
- Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
-
- _establishCollectionCursorsScheduler = stdx::make_unique<RemoteCommandRetryScheduler>(
- _executor,
- RemoteCommandRequest(_source,
- _sourceNss.db().toString(),
- cmdObj.obj(),
- ReadPreferenceSetting::secondaryPreferredMetadata(),
- opCtx,
- RemoteCommandRequest::kNoTimeout),
- stdx::bind(&CollectionCloner::_establishCollectionCursorsCallback,
- this,
- stdx::placeholders::_1,
- cursorCommand),
- RemoteCommandRetryScheduler::makeRetryPolicy(
- numInitialSyncCollectionFindAttempts.load(),
- executor::RemoteCommandRequest::kNoTimeout,
- RemoteCommandRetryScheduler::kAllRetriableErrors));
- auto scheduleStatus = _establishCollectionCursorsScheduler->startup();
- LOG(1) << "Attempting to establish cursors with maxNumClonerCursors: " << _maxNumClonerCursors;
+ auto&& scheduleResult =
+ _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback,
+ this,
+ stdx::placeholders::_1,
+ lastBatch,
+ onCompletionGuard));
+ if (!scheduleResult.isOK()) {
+ Status newStatus{scheduleResult.getStatus().code(),
+ str::stream() << "While cloning collection '" << _sourceNss.ns()
+ << "' there was an error '"
+ << scheduleResult.getStatus().reason()
+ << "'"};
- if (!scheduleStatus.isOK()) {
- _establishCollectionCursorsScheduler.reset();
- _finishCallback(scheduleStatus);
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, newStatus);
return;
}
-}
-Status CollectionCloner::_parseCursorResponse(BSONObj response,
- std::vector<CursorResponse>* cursors,
- EstablishCursorsCommand cursorCommand) {
- switch (cursorCommand) {
- case Find: {
- StatusWith<CursorResponse> findResponse = CursorResponse::parseFromBSON(response);
- if (!findResponse.isOK()) {
- Status errorStatus{findResponse.getStatus().code(),
- str::stream()
- << "While parsing the 'find' query against collection '"
- << _sourceNss.ns()
- << "' there was an error '"
- << findResponse.getStatus().reason()
- << "'"};
- return errorStatus;
- }
- cursors->push_back(std::move(findResponse.getValue()));
- break;
- }
- case ParallelCollScan: {
- auto cursorElements = _parseParallelCollectionScanResponse(response);
- if (!cursorElements.isOK()) {
- return cursorElements.getStatus();
- }
- std::vector<BSONElement> cursorsArray;
- cursorsArray = cursorElements.getValue();
- // Parse each BSONElement into a 'CursorResponse' object.
- for (BSONElement cursor : cursorsArray) {
- if (!cursor.isABSONObj()) {
- Status errorStatus(
- ErrorCodes::FailedToParse,
- "The 'cursor' field in the list of cursor responses is not a "
- "valid BSON Object");
- return errorStatus;
- }
- const BSONObj cursorObj = cursor.Obj().getOwned();
- StatusWith<CursorResponse> parallelCollScanResponse =
- CursorResponse::parseFromBSON(cursorObj);
- if (!parallelCollScanResponse.isOK()) {
- return parallelCollScanResponse.getStatus();
- }
- cursors->push_back(std::move(parallelCollScanResponse.getValue()));
+ MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterInitialFind, nssData) {
+ const BSONObj& data = nssData.getData();
+ auto nss = data["nss"].str();
+ // Only hang when cloning the specified collection.
+ if (_destNss.toString() == nss) {
+ while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterInitialFind) &&
+ !_isShuttingDown()) {
+ log() << "initialSyncHangCollectionClonerAfterInitialFind fail point enabled for "
+ << nss << ". Blocking until fail point is disabled.";
+ mongo::sleepsecs(1);
}
- break;
- }
- default: {
- Status errorStatus(
- ErrorCodes::FailedToParse,
- "The command used to establish the collection cloner cursors is not valid.");
- return errorStatus;
}
}
- return Status::OK();
-}
-void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCallbackArgs& rcbd,
- EstablishCursorsCommand cursorCommand) {
- if (_state == State::kShuttingDown) {
- Status shuttingDownStatus{ErrorCodes::CallbackCanceled, "Cloner shutting down."};
- _finishCallback(shuttingDownStatus);
- return;
- }
- auto response = rcbd.response;
- if (!response.isOK()) {
- _finishCallback(response.status);
- return;
- }
- Status commandStatus = getStatusFromCommandResult(response.data);
- if (!commandStatus.isOK()) {
- Status newStatus{commandStatus.code(),
- str::stream() << "While querying collection '" << _sourceNss.ns()
- << "' there was an error '"
- << commandStatus.reason()
- << "'"};
- _finishCallback(commandStatus);
- return;
+ if (!lastBatch) {
+ invariant(getMoreBob);
+ getMoreBob->append("getMore", batchData.cursorId);
+ getMoreBob->append("collection", batchData.nss.coll());
+ getMoreBob->append("batchSize", _batchSize);
}
+}
- std::vector<CursorResponse> cursorResponses;
- Status parseResponseStatus =
- _parseCursorResponse(response.data, &cursorResponses, cursorCommand);
- if (!parseResponseStatus.isOK()) {
- _finishCallback(parseResponseStatus);
+void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::CallbackArgs& cbd) {
+ if (!cbd.status.isOK()) {
+ _finishCallback(cbd.status);
return;
}
- LOG(1) << "Collection cloner running with " << cursorResponses.size()
- << " cursors established.";
-
- // Initialize the 'AsyncResultsMerger'(ARM).
- std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
- for (auto&& cursorResponse : cursorResponses) {
- // A placeholder 'ShardId' is used until the ARM is made less sharding specific.
- remoteCursors.emplace_back(
- ShardId("CollectionClonerSyncSource"), _source, std::move(cursorResponse));
- }
-
- // An empty list of authenticated users is passed into the cluster parameters
- // as user information is not used in the ARM in context of collection cloning.
- _clusterClientCursorParams =
- stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator());
- _clusterClientCursorParams->remotes = std::move(remoteCursors);
- _arm = stdx::make_unique<AsyncResultsMerger>(_executor, _clusterClientCursorParams.get());
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
@@ -589,134 +476,50 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
// that will cause the destructor of the completion guard to run, the destructor must be run
// outside the mutex. This is a necessary condition to invoke _finishCallback.
stdx::lock_guard<stdx::mutex> lock(_mutex);
- Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
- if (!scheduleStatus.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
- return;
- }
-}
-
-StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionScanResponse(
- BSONObj resp) {
- if (!resp.hasField("cursors")) {
- return Status(ErrorCodes::CursorNotFound,
- "The 'parallelCollectionScan' response does not contain a 'cursors' field.");
- }
- BSONElement response = resp["cursors"];
- if (response.type() == BSONType::Array) {
- return response.Array();
- } else {
- return Status(
- ErrorCodes::FailedToParse,
- "The 'parallelCollectionScan' response is unable to be transformed into an array.");
- }
-}
-
-Status CollectionCloner::_bufferNextBatchFromArm() {
- while (_arm->ready()) {
- auto armResultStatus = _arm->nextReady();
- if (!armResultStatus.getStatus().isOK()) {
- return armResultStatus.getStatus();
- }
- if (armResultStatus.getValue().isEOF()) {
- // We have reached the end of the batch.
- break;
- } else {
- auto queryResult = armResultStatus.getValue().getResult();
- _documentsToInsert.push_back(std::move(*queryResult));
- }
+ if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) {
+ warning()
+ << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:"
+ << this->_sourceNss;
}
- return Status::OK();
-}
-
-Status CollectionCloner::_scheduleNextARMResultsCallback(
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- auto nextEvent = _arm->nextEvent(opCtx);
- if (!nextEvent.isOK()) {
- return nextEvent.getStatus();
- }
- auto event = nextEvent.getValue();
- auto handleARMResultsOnNextEvent =
- _executor->onEvent(event,
- stdx::bind(&CollectionCloner::_handleARMResultsCallback,
- this,
- stdx::placeholders::_1,
- onCompletionGuard));
- return handleARMResultsOnNextEvent.getStatus();
-}
-
-void CollectionCloner::_handleARMResultsCallback(
- const executor::TaskExecutor::CallbackArgs& cbd,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- auto setResultAndCancelRemainingWork = [this](std::shared_ptr<OnCompletionGuard> guard,
- Status status) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- guard->setResultAndCancelRemainingWork_inlock(lock, status);
- return;
- };
+ auto status = _storageInterface->createCollectionForBulkLoading(
+ _destNss, _options, _idIndexSpec, _indexSpecs);
- if (!cbd.status.isOK()) {
- // Wait for active inserts to complete.
- waitForDbWorker();
- Status newStatus{cbd.status.code(),
- str::stream() << "While querying collection '" << _sourceNss.ns()
- << "' there was an error '"
- << cbd.status.reason()
- << "'"};
- setResultAndCancelRemainingWork(onCompletionGuard, cbd.status);
+ if (!status.isOK()) {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status.getStatus());
return;
}
- // Pull the documents from the ARM into a buffer until the entire batch has been processed.
- auto nextBatchStatus = _bufferNextBatchFromArm();
- if (!nextBatchStatus.isOK()) {
- setResultAndCancelRemainingWork(onCompletionGuard, nextBatchStatus);
- return;
+ _stats.indexes = _indexSpecs.size();
+ if (!_idIndexSpec.isEmpty()) {
+ ++_stats.indexes;
}
- bool lastBatch = _arm->remotesExhausted();
- auto&& scheduleResult =
- _scheduleDbWorkFn(stdx::bind(&CollectionCloner::_insertDocumentsCallback,
- this,
- stdx::placeholders::_1,
- lastBatch,
- onCompletionGuard));
- if (!scheduleResult.isOK()) {
- Status newStatus{scheduleResult.getStatus().code(),
- str::stream() << "While cloning collection '" << _sourceNss.ns()
- << "' there was an error '"
- << scheduleResult.getStatus().reason()
- << "'"};
- setResultAndCancelRemainingWork(onCompletionGuard, scheduleResult.getStatus());
- return;
- }
+ _collLoader = std::move(status.getValue());
- MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) {
- const BSONObj& data = nssData.getData();
- auto nss = data["nss"].str();
- // Only hang when cloning the specified collection.
- if (_destNss.toString() == nss) {
- while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterHandlingBatchResponse) &&
- !_isShuttingDown()) {
- log() << "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point "
- "enabled for "
- << nss << ". Blocking until fail point is disabled.";
- mongo::sleepsecs(1);
- }
- }
- }
+ _findFetcher = stdx::make_unique<Fetcher>(
+ _executor,
+ _source,
+ _sourceNss.db().toString(),
+ BSON("find" << _sourceNss.coll() << "noCursorTimeout" << true << "batchSize" << _batchSize),
+ stdx::bind(&CollectionCloner::_findCallback,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3,
+ onCompletionGuard),
+ ReadPreferenceSetting::secondaryPreferredMetadata(),
+ RemoteCommandRequest::kNoTimeout,
+ RemoteCommandRetryScheduler::makeRetryPolicy(
+ numInitialSyncCollectionFindAttempts.load(),
+ executor::RemoteCommandRequest::kNoTimeout,
+ RemoteCommandRetryScheduler::kAllRetriableErrors));
- // If the remote cursors are not exhausted, schedule this callback again to handle
- // the impending cursor response.
- if (!lastBatch) {
- Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
- if (!scheduleStatus.isOK()) {
- setResultAndCancelRemainingWork(onCompletionGuard, scheduleStatus);
- return;
- }
+ Status scheduleStatus = _findFetcher->schedule();
+ if (!scheduleStatus.isOK()) {
+ _findFetcher.reset();
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
+ return;
}
}
@@ -730,21 +533,24 @@ void CollectionCloner::_insertDocumentsCallback(
return;
}
- UniqueLock lk(_mutex);
std::vector<BSONObj> docs;
- if (_documentsToInsert.size() == 0) {
+ UniqueLock lk(_mutex);
+ if (_documents.size() == 0) {
warning() << "_insertDocumentsCallback, but no documents to insert for ns:" << _destNss;
+
if (lastBatch) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
}
return;
}
- _documentsToInsert.swap(docs);
+
+ _documents.swap(docs);
_stats.documentsCopied += docs.size();
++_stats.fetchBatches;
_progressMeter.hit(int(docs.size()));
invariant(_collLoader);
const auto status = _collLoader->insertDocuments(docs.cbegin(), docs.cend());
+
if (!status.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, status);
return;
@@ -764,10 +570,12 @@ void CollectionCloner::_insertDocumentsCallback(
}
}
- if (lastBatch) {
- // Clean up resources once the last batch has been copied over and set the status to OK.
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
+ if (!lastBatch) {
+ return;
}
+
+ // Done with last batch and time to set result in completion guard to Status::OK().
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, Status::OK());
}
void CollectionCloner::_finishCallback(const Status& status) {
@@ -776,6 +584,7 @@ void CollectionCloner::_finishCallback(const Status& status) {
// Copy the status so we can change it below if needed.
auto finalStatus = status;
bool callCollectionLoader = false;
+
decltype(_onCompletion) onCompletion;
{
LockGuard lk(_mutex);
@@ -786,6 +595,7 @@ void CollectionCloner::_finishCallback(const Status& status) {
invariant(_onCompletion);
std::swap(_onCompletion, onCompletion);
}
+
if (callCollectionLoader) {
if (finalStatus.isOK()) {
const auto loaderStatus = _collLoader->commit();
@@ -799,6 +609,7 @@ void CollectionCloner::_finishCallback(const Status& status) {
// This will release the resources held by the loader.
_collLoader.reset();
}
+
onCompletion(finalStatus);
// This will release the resources held by the callback function object. '_onCompletion' is