diff options
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 17 |
1 files changed, 12 insertions, 5 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index ad2987e7603..4626948ece9 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -221,8 +221,7 @@ void CollectionCloner::shutdown() { void CollectionCloner::_cancelRemainingWork_inlock() { if (_arm) { Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - _killArmHandle = _arm->kill(opCtx); + _killArmHandle = _arm->kill(cc().getOperationContext()); } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); @@ -577,7 +576,9 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa _clusterClientCursorParams = stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()); _clusterClientCursorParams->remotes = std::move(remoteCursors); - _arm = stdx::make_unique<AsyncResultsMerger>(_executor, _clusterClientCursorParams.get()); + Client::initThreadIfNotAlready(); + _arm = stdx::make_unique<AsyncResultsMerger>( + cc().getOperationContext(), _executor, _clusterClientCursorParams.get()); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; @@ -590,6 +591,7 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa // outside the mutex. This is a necessary condition to invoke _finishCallback. stdx::lock_guard<stdx::mutex> lock(_mutex); Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); + _arm->detachFromOperationContext(); if (!scheduleStatus.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); return; @@ -613,6 +615,9 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS } Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { + Client::initThreadIfNotAlready(); + auto opCtx = cc().getOperationContext(); + _arm->reattachToOperationContext(opCtx); while (_arm->ready()) { auto armResultStatus = _arm->nextReady(); if (!armResultStatus.getStatus().isOK()) { @@ -626,6 +631,7 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { _documentsToInsert.push_back(std::move(*queryResult)); } } + _arm->detachFromOperationContext(); return Status::OK(); } @@ -633,8 +639,9 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { Status CollectionCloner::_scheduleNextARMResultsCallback( std::shared_ptr<OnCompletionGuard> onCompletionGuard) { Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - auto nextEvent = _arm->nextEvent(opCtx); + _arm->reattachToOperationContext(cc().getOperationContext()); + auto nextEvent = _arm->nextEvent(); + _arm->detachFromOperationContext(); if (!nextEvent.isOK()) { return nextEvent.getStatus(); } |