diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-08-22 22:53:08 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-08-30 11:47:14 -0400 |
commit | bfbeb0cbabd9ae85f34df430474c9e524b274862 (patch) | |
tree | 4697dbc84e56120a5f34a1bd82182dc1e8740131 /src/mongo/db/repl/collection_cloner.cpp | |
parent | 88ef24561ef69ac7756b80256a86515180b830a3 (diff) | |
download | mongo-bfbeb0cbabd9ae85f34df430474c9e524b274862.tar.gz |
SERVER-30799 Avoid misleading empty batches with tailable cursors.
This bug impacts tailable cursors being sent through a mongos.
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 17 |
1 files changed, 12 insertions, 5 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index ad2987e7603..4626948ece9 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -221,8 +221,7 @@ void CollectionCloner::shutdown() { void CollectionCloner::_cancelRemainingWork_inlock() { if (_arm) { Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - _killArmHandle = _arm->kill(opCtx); + _killArmHandle = _arm->kill(cc().getOperationContext()); } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); @@ -577,7 +576,9 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa _clusterClientCursorParams = stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()); _clusterClientCursorParams->remotes = std::move(remoteCursors); - _arm = stdx::make_unique<AsyncResultsMerger>(_executor, _clusterClientCursorParams.get()); + Client::initThreadIfNotAlready(); + _arm = stdx::make_unique<AsyncResultsMerger>( + cc().getOperationContext(), _executor, _clusterClientCursorParams.get()); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; @@ -590,6 +591,7 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa // outside the mutex. This is a necessary condition to invoke _finishCallback. stdx::lock_guard<stdx::mutex> lock(_mutex); Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); + _arm->detachFromOperationContext(); if (!scheduleStatus.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); return; @@ -613,6 +615,9 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS } Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { + Client::initThreadIfNotAlready(); + auto opCtx = cc().getOperationContext(); + _arm->reattachToOperationContext(opCtx); while (_arm->ready()) { auto armResultStatus = _arm->nextReady(); if (!armResultStatus.getStatus().isOK()) { @@ -626,6 +631,7 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { _documentsToInsert.push_back(std::move(*queryResult)); } } + _arm->detachFromOperationContext(); return Status::OK(); } @@ -633,8 +639,9 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { Status CollectionCloner::_scheduleNextARMResultsCallback( std::shared_ptr<OnCompletionGuard> onCompletionGuard) { Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - auto nextEvent = _arm->nextEvent(opCtx); + _arm->reattachToOperationContext(cc().getOperationContext()); + auto nextEvent = _arm->nextEvent(); + _arm->detachFromOperationContext(); if (!nextEvent.isOK()) { return nextEvent.getStatus(); } |