summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_cloner.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-08-22 22:53:08 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-08-30 11:47:14 -0400
commitbfbeb0cbabd9ae85f34df430474c9e524b274862 (patch)
tree4697dbc84e56120a5f34a1bd82182dc1e8740131 /src/mongo/db/repl/collection_cloner.cpp
parent88ef24561ef69ac7756b80256a86515180b830a3 (diff)
downloadmongo-bfbeb0cbabd9ae85f34df430474c9e524b274862.tar.gz
SERVER-30799 Avoid misleading empty batches with tailable cursors.
This bug impacts tailable cursors being sent through a mongos.
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp17
1 files changed, 12 insertions, 5 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index ad2987e7603..4626948ece9 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -221,8 +221,7 @@ void CollectionCloner::shutdown() {
void CollectionCloner::_cancelRemainingWork_inlock() {
if (_arm) {
Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- _killArmHandle = _arm->kill(opCtx);
+ _killArmHandle = _arm->kill(cc().getOperationContext());
}
_countScheduler.shutdown();
_listIndexesFetcher.shutdown();
@@ -577,7 +576,9 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
_clusterClientCursorParams =
stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator());
_clusterClientCursorParams->remotes = std::move(remoteCursors);
- _arm = stdx::make_unique<AsyncResultsMerger>(_executor, _clusterClientCursorParams.get());
+ Client::initThreadIfNotAlready();
+ _arm = stdx::make_unique<AsyncResultsMerger>(
+ cc().getOperationContext(), _executor, _clusterClientCursorParams.get());
// This completion guard invokes _finishCallback on destruction.
auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); };
@@ -590,6 +591,7 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
// outside the mutex. This is a necessary condition to invoke _finishCallback.
stdx::lock_guard<stdx::mutex> lock(_mutex);
Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard);
+ _arm->detachFromOperationContext();
if (!scheduleStatus.isOK()) {
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus);
return;
@@ -613,6 +615,9 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS
}
Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
+ Client::initThreadIfNotAlready();
+ auto opCtx = cc().getOperationContext();
+ _arm->reattachToOperationContext(opCtx);
while (_arm->ready()) {
auto armResultStatus = _arm->nextReady();
if (!armResultStatus.getStatus().isOK()) {
@@ -626,6 +631,7 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
_documentsToInsert.push_back(std::move(*queryResult));
}
}
+ _arm->detachFromOperationContext();
return Status::OK();
}
@@ -633,8 +639,9 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) {
Status CollectionCloner::_scheduleNextARMResultsCallback(
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
Client::initThreadIfNotAlready();
- auto opCtx = cc().getOperationContext();
- auto nextEvent = _arm->nextEvent(opCtx);
+ _arm->reattachToOperationContext(cc().getOperationContext());
+ auto nextEvent = _arm->nextEvent();
+ _arm->detachFromOperationContext();
if (!nextEvent.isOK()) {
return nextEvent.getStatus();
}