From bfbeb0cbabd9ae85f34df430474c9e524b274862 Mon Sep 17 00:00:00 2001 From: Charlie Swanson Date: Tue, 22 Aug 2017 22:53:08 -0400 Subject: SERVER-30799 Avoid misleading empty batches with tailable cursors. This bug impacts tailable cursors being sent through a mongos. --- src/mongo/db/repl/collection_cloner.cpp | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) (limited to 'src/mongo/db/repl/collection_cloner.cpp') diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index ad2987e7603..4626948ece9 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -221,8 +221,7 @@ void CollectionCloner::shutdown() { void CollectionCloner::_cancelRemainingWork_inlock() { if (_arm) { Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - _killArmHandle = _arm->kill(opCtx); + _killArmHandle = _arm->kill(cc().getOperationContext()); } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); @@ -577,7 +576,9 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa _clusterClientCursorParams = stdx::make_unique(_sourceNss, UserNameIterator()); _clusterClientCursorParams->remotes = std::move(remoteCursors); - _arm = stdx::make_unique(_executor, _clusterClientCursorParams.get()); + Client::initThreadIfNotAlready(); + _arm = stdx::make_unique( + cc().getOperationContext(), _executor, _clusterClientCursorParams.get()); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; @@ -590,6 +591,7 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa // outside the mutex. This is a necessary condition to invoke _finishCallback. stdx::lock_guard lock(_mutex); Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); + _arm->detachFromOperationContext(); if (!scheduleStatus.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); return; @@ -613,6 +615,9 @@ StatusWith> CollectionCloner::_parseParallelCollectionS } Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { + Client::initThreadIfNotAlready(); + auto opCtx = cc().getOperationContext(); + _arm->reattachToOperationContext(opCtx); while (_arm->ready()) { auto armResultStatus = _arm->nextReady(); if (!armResultStatus.getStatus().isOK()) { @@ -626,6 +631,7 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { _documentsToInsert.push_back(std::move(*queryResult)); } } + _arm->detachFromOperationContext(); return Status::OK(); } @@ -633,8 +639,9 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { Status CollectionCloner::_scheduleNextARMResultsCallback( std::shared_ptr onCompletionGuard) { Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - auto nextEvent = _arm->nextEvent(opCtx); + _arm->reattachToOperationContext(cc().getOperationContext()); + auto nextEvent = _arm->nextEvent(); + _arm->detachFromOperationContext(); if (!nextEvent.isOK()) { return nextEvent.getStatus(); } -- cgit v1.2.1