summaryrefslogtreecommitdiff
path: root/src/mongo/s/query
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp4
-rw-r--r--src/mongo/s/query/async_results_merger.h4
-rw-r--r--src/mongo/s/query/blocking_results_merger.cpp4
-rw-r--r--src/mongo/s/query/blocking_results_merger.h6
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp4
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.h4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp17
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h11
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp8
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.cpp8
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.h8
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.cpp13
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.h8
-rw-r--r--src/mongo/s/query/establish_cursors.cpp4
-rw-r--r--src/mongo/s/query/establish_cursors.h3
-rw-r--r--src/mongo/s/query/owned_remote_cursor.h6
-rw-r--r--src/mongo/s/query/router_stage_merge.h7
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp4
-rw-r--r--src/mongo/s/query/store_possible_cursor.h2
19 files changed, 72 insertions, 53 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 71701cfa4a9..8ee68933d49 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -82,10 +82,10 @@ int compareSortKeys(BSONObj leftSortKey, BSONObj rightSortKey, BSONObj sortKeyPa
} // namespace
AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams params)
: _opCtx(opCtx),
- _executor(executor),
+ _executor(std::move(executor)),
// This strange initialization is to work around the fact that the IDL does not currently
// support a default value for an enum. The default tailable mode should be 'kNormal', but
// since that is not supported we treat boost::none (unspecified) to mean 'kNormal'.
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 92cf05e66f5..3cf357dca6b 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -98,7 +98,7 @@ public:
* with a new, valid OperationContext before the next use.
*/
AsyncResultsMerger(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams params);
/**
@@ -446,7 +446,7 @@ private:
void _updateRemoteMetadata(WithLock, size_t remoteIndex, const CursorResponse& response);
OperationContext* _opCtx;
- executor::TaskExecutor* _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
TailableModeEnum _tailableMode;
AsyncResultsMergerParams _params;
diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp
index c8eda4d0253..7d01175c5c5 100644
--- a/src/mongo/s/query/blocking_results_merger.cpp
+++ b/src/mongo/s/query/blocking_results_merger.cpp
@@ -38,11 +38,11 @@ namespace mongo {
BlockingResultsMerger::BlockingResultsMerger(OperationContext* opCtx,
AsyncResultsMergerParams&& armParams,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
std::unique_ptr<ResourceYielder> resourceYielder)
: _tailableMode(armParams.getTailableMode().value_or(TailableModeEnum::kNormal)),
_executor(executor),
- _arm(opCtx, executor, std::move(armParams)),
+ _arm(opCtx, std::move(executor), std::move(armParams)),
_resourceYielder(std::move(resourceYielder)) {}
StatusWith<stdx::cv_status> BlockingResultsMerger::doWaiting(
diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h
index 75da6fcc26f..563a1e36010 100644
--- a/src/mongo/s/query/blocking_results_merger.h
+++ b/src/mongo/s/query/blocking_results_merger.h
@@ -29,6 +29,8 @@
#pragma once
+#include <memory>
+
#include "mongo/s/query/async_results_merger.h"
#include "mongo/s/query/router_exec_stage.h"
@@ -42,7 +44,7 @@ class BlockingResultsMerger {
public:
BlockingResultsMerger(OperationContext* opCtx,
AsyncResultsMergerParams&& arm,
- executor::TaskExecutor*,
+ std::shared_ptr<executor::TaskExecutor> executor,
std::unique_ptr<ResourceYielder> resourceYielder);
/**
@@ -113,7 +115,7 @@ private:
const std::function<StatusWith<stdx::cv_status>()>& waitFn) noexcept;
TailableModeEnum _tailableMode;
- executor::TaskExecutor* _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
// In a case where we have a tailable, awaitData cursor, a call to 'next()' will block waiting
// for an event generated by '_arm', but may time out waiting for this event to be triggered.
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index c11be89a9be..989231ce951 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -485,7 +485,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
std::vector<OwnedRemoteCursor> ownedCursors,
const std::vector<ShardId>& targetedShards,
boost::optional<BSONObj> shardCursorsSortSpec,
- executor::TaskExecutor* executor) {
+ std::shared_ptr<executor::TaskExecutor> executor) {
auto* opCtx = mergePipeline->getContext()->opCtx;
AsyncResultsMergerParams armParams;
armParams.setSort(shardCursorsSortSpec);
@@ -523,7 +523,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
// For change streams, we need to set up a custom stage to establish cursors on new shards when
// they are added, to ensure we don't miss results from the new shards.
auto mergeCursorsStage = DocumentSourceMergeCursors::create(
- executor, std::move(armParams), mergePipeline->getContext());
+ std::move(executor), std::move(armParams), mergePipeline->getContext());
if (liteParsedPipeline.hasChangeStream()) {
mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create(
diff --git a/src/mongo/s/query/cluster_aggregation_planner.h b/src/mongo/s/query/cluster_aggregation_planner.h
index 1d5ed1e13b5..95158f8a81f 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.h
+++ b/src/mongo/s/query/cluster_aggregation_planner.h
@@ -29,6 +29,8 @@
#pragma once
+#include <memory>
+
#include "mongo/db/pipeline/exchange_spec_gen.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
@@ -82,7 +84,7 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
std::vector<OwnedRemoteCursor> remoteCursors,
const std::vector<ShardId>& targetedShards,
boost::optional<BSONObj> shardCursorsSortSpec,
- executor::TaskExecutor*);
+ std::shared_ptr<executor::TaskExecutor> executor);
/**
* Builds a ClusterClientCursor which will execute 'pipeline'. If 'pipeline' consists entirely of
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index ecfb31a2a46..8737dea92ea 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -58,11 +58,12 @@ std::unique_ptr<ClusterClientCursor> ClusterClientCursorGuard::releaseCursor() {
return std::move(_ccc);
}
-ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams&& params) {
+ClusterClientCursorGuard ClusterClientCursorImpl::make(
+ OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ ClusterClientCursorParams&& params) {
std::unique_ptr<ClusterClientCursor> cursor(new ClusterClientCursorImpl(
- opCtx, executor, std::move(params), opCtx->getLogicalSessionId()));
+ opCtx, std::move(executor), std::move(params), opCtx->getLogicalSessionId()));
return ClusterClientCursorGuard(opCtx, std::move(cursor));
}
@@ -75,11 +76,11 @@ ClusterClientCursorGuard ClusterClientCursorImpl::make(OperationContext* opCtx,
}
ClusterClientCursorImpl::ClusterClientCursorImpl(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid)
: _params(std::move(params)),
- _root(buildMergerPlan(opCtx, executor, &_params)),
+ _root(buildMergerPlan(opCtx, std::move(executor), &_params)),
_lsid(lsid),
_opCtx(opCtx),
_createdDate(opCtx->getServiceContext()->getPreciseClockSource()->now()),
@@ -223,7 +224,9 @@ boost::optional<ReadPreferenceSetting> ClusterClientCursorImpl::getReadPreferenc
}
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
- OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) {
+ OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ ClusterClientCursorParams* params) {
const auto skip = params->skip;
const auto limit = params->limit;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 50d36853cf8..2ec439e9627 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -90,7 +90,7 @@ public:
* ensured by an RAII object.
*/
static ClusterClientCursorGuard make(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
ClusterClientCursorParams&& params);
/**
@@ -161,7 +161,7 @@ public:
* Constructs a cluster client cursor.
*/
ClusterClientCursorImpl(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
ClusterClientCursorParams&& params,
boost::optional<LogicalSessionId> lsid);
@@ -169,9 +169,10 @@ private:
/**
* Constructs the pipeline of MergerPlanStages which will be used to answer the query.
*/
- std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params);
+ std::unique_ptr<RouterExecStage> buildMergerPlan(
+ OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ ClusterClientCursorParams* params);
ClusterClientCursorParams _params;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index 215ac79365e..0119abcbd68 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -209,13 +209,15 @@ TEST_F(ClusterClientCursorImplTest, LogicalSessionIdsOnCursors) {
}
TEST_F(ClusterClientCursorImplTest, ShouldStoreLSIDIfSetOnOpCtx) {
+ std::shared_ptr<executor::TaskExecutor> nullExecutor;
+
{
// Make a cursor with no lsid or txnNumber.
ClusterClientCursorParams params(NamespaceString("test"), {});
params.lsid = _opCtx->getLogicalSessionId();
params.txnNumber = _opCtx->getTxnNumber();
- auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullptr, std::move(params));
+ auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullExecutor, std::move(params));
ASSERT_FALSE(cursor->getLsid());
ASSERT_FALSE(cursor->getTxnNumber());
}
@@ -229,7 +231,7 @@ TEST_F(ClusterClientCursorImplTest, ShouldStoreLSIDIfSetOnOpCtx) {
params.lsid = _opCtx->getLogicalSessionId();
params.txnNumber = _opCtx->getTxnNumber();
- auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullptr, std::move(params));
+ auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullExecutor, std::move(params));
ASSERT_EQ(*cursor->getLsid(), lsid);
ASSERT_FALSE(cursor->getTxnNumber());
}
@@ -243,7 +245,7 @@ TEST_F(ClusterClientCursorImplTest, ShouldStoreLSIDIfSetOnOpCtx) {
params.lsid = _opCtx->getLogicalSessionId();
params.txnNumber = _opCtx->getTxnNumber();
- auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullptr, std::move(params));
+ auto cursor = ClusterClientCursorImpl::make(_opCtx.get(), nullExecutor, std::move(params));
ASSERT_EQ(*cursor->getLsid(), lsid);
ASSERT_EQ(*cursor->getTxnNumber(), txnNumber);
}
diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp
index 0959e8f6efb..d48c6ca804a 100644
--- a/src/mongo/s/query/document_source_merge_cursors.cpp
+++ b/src/mongo/s/query/document_source_merge_cursors.cpp
@@ -46,13 +46,13 @@ REGISTER_DOCUMENT_SOURCE(mergeCursors,
constexpr StringData DocumentSourceMergeCursors::kStageName;
DocumentSourceMergeCursors::DocumentSourceMergeCursors(
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams armParams,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<BSONObj> ownedParamsSpec)
: DocumentSource(expCtx),
_armParamsObj(std::move(ownedParamsSpec)),
- _executor(executor),
+ _executor(std::move(executor)),
_armParams(std::move(armParams)) {}
std::size_t DocumentSourceMergeCursors::getNumRemotes() const {
@@ -129,10 +129,10 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
}
boost::intrusive_ptr<DocumentSourceMergeCursors> DocumentSourceMergeCursors::create(
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams params,
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceMergeCursors(executor, std::move(params), expCtx);
+ return new DocumentSourceMergeCursors(std::move(executor), std::move(params), expCtx);
}
void DocumentSourceMergeCursors::detachFromOperationContext() {
diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h
index 60cdf7d25f6..f8e25299260 100644
--- a/src/mongo/s/query/document_source_merge_cursors.h
+++ b/src/mongo/s/query/document_source_merge_cursors.h
@@ -29,6 +29,8 @@
#pragma once
+#include <memory>
+
#include "mongo/db/pipeline/document_source.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/query/blocking_results_merger.h"
@@ -60,7 +62,7 @@ public:
* Creates a new DocumentSourceMergeCursors from the given parameters.
*/
static boost::intrusive_ptr<DocumentSourceMergeCursors> create(
- executor::TaskExecutor*,
+ std::shared_ptr<executor::TaskExecutor>,
AsyncResultsMergerParams,
const boost::intrusive_ptr<ExpressionContext>&);
@@ -149,7 +151,7 @@ protected:
void doDispose() final;
private:
- DocumentSourceMergeCursors(executor::TaskExecutor*,
+ DocumentSourceMergeCursors(std::shared_ptr<executor::TaskExecutor>,
AsyncResultsMergerParams,
const boost::intrusive_ptr<ExpressionContext>&,
boost::optional<BSONObj> ownedParamsSpec = boost::none);
@@ -164,7 +166,7 @@ private:
// params are in use. We store them here.
boost::optional<BSONObj> _armParamsObj;
- executor::TaskExecutor* _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
// '_blockingResultsMerger' is lazily populated. Until we need to use it, '_armParams' will be
// populated with the parameters. Once we start using '_blockingResultsMerger', '_armParams'
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp
index 3a295f200f7..00a8971454c 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.cpp
+++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp
@@ -49,22 +49,25 @@ bool needsUpdate(const Document& childResult) {
boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> DocumentSourceUpdateOnAddShard::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
std::vector<ShardId> shardsWithCursors,
BSONObj cmdToRunOnNewShards) {
- return new DocumentSourceUpdateOnAddShard(
- expCtx, executor, mergeCursors, std::move(shardsWithCursors), cmdToRunOnNewShards);
+ return new DocumentSourceUpdateOnAddShard(expCtx,
+ std::move(executor),
+ mergeCursors,
+ std::move(shardsWithCursors),
+ cmdToRunOnNewShards);
}
DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
std::vector<ShardId>&& shardsWithCursors,
BSONObj cmdToRunOnNewShards)
: DocumentSource(expCtx),
- _executor(executor),
+ _executor(std::move(executor)),
_mergeCursors(mergeCursors),
_shardsWithCursors(std::move(shardsWithCursors)),
_cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {}
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h
index 5b00ece5e52..e8f2473b432 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.h
+++ b/src/mongo/s/query/document_source_update_on_add_shard.h
@@ -29,6 +29,8 @@
#pragma once
+#include <memory>
+
#include "mongo/db/pipeline/document_source.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/query/document_source_merge_cursors.h"
@@ -51,7 +53,7 @@ public:
*/
static boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> create(
const boost::intrusive_ptr<ExpressionContext>&,
- executor::TaskExecutor*,
+ std::shared_ptr<executor::TaskExecutor> executor,
const boost::intrusive_ptr<DocumentSourceMergeCursors>&,
std::vector<ShardId> shardsWithCursors,
BSONObj cmdToRunOnNewShards);
@@ -81,7 +83,7 @@ public:
private:
DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&,
- executor::TaskExecutor*,
+ std::shared_ptr<executor::TaskExecutor> executor,
const boost::intrusive_ptr<DocumentSourceMergeCursors>&,
std::vector<ShardId>&& shardsWithCursors,
BSONObj cmdToRunOnNewShards);
@@ -96,7 +98,7 @@ private:
*/
std::vector<RemoteCursor> establishShardCursorsOnNewShards(const Document& newShardDetectedObj);
- executor::TaskExecutor* _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors;
std::vector<ShardId> _shardsWithCursors;
BSONObj _cmdToRunOnNewShards;
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 614672813a1..b97f9026d03 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -50,7 +50,7 @@
namespace mongo {
std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
const NamespaceString& nss,
const ReadPreferenceSetting readPref,
const std::vector<std::pair<ShardId, BSONObj>>& remotes,
@@ -136,7 +136,7 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
}
// Schedule killCursors against all cursors that were established.
- killRemoteCursors(opCtx, executor, std::move(remoteCursors), nss);
+ killRemoteCursors(opCtx, executor.get(), std::move(remoteCursors), nss);
} catch (const DBException&) {
// Ignore the new error and rethrow the original one.
}
diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h
index be7eac8d869..97e72225072 100644
--- a/src/mongo/s/query/establish_cursors.h
+++ b/src/mongo/s/query/establish_cursors.h
@@ -30,6 +30,7 @@
#pragma once
#include <boost/optional.hpp>
+#include <memory>
#include <vector>
#include "mongo/base/status_with.h"
@@ -64,7 +65,7 @@ class CursorResponse;
*/
std::vector<RemoteCursor> establishCursors(
OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
const NamespaceString& nss,
const ReadPreferenceSetting readPref,
const std::vector<std::pair<ShardId, BSONObj>>& remotes,
diff --git a/src/mongo/s/query/owned_remote_cursor.h b/src/mongo/s/query/owned_remote_cursor.h
index 473196e4de5..0e4e03103fe 100644
--- a/src/mongo/s/query/owned_remote_cursor.h
+++ b/src/mongo/s/query/owned_remote_cursor.h
@@ -50,10 +50,8 @@ public:
~OwnedRemoteCursor() {
if (_remoteCursor) {
- killRemoteCursor(_opCtx,
- Grid::get(_opCtx)->getExecutorPool()->getArbitraryExecutor(),
- releaseCursor(),
- _nss);
+ auto executor = Grid::get(_opCtx)->getExecutorPool()->getArbitraryExecutor();
+ killRemoteCursor(_opCtx, executor.get(), releaseCursor(), _nss);
}
}
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 574f39a280d..a7cead17d89 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -29,6 +29,8 @@
#pragma once
+#include <memory>
+
#include "mongo/executor/task_executor.h"
#include "mongo/s/query/blocking_results_merger.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
@@ -44,9 +46,10 @@ namespace mongo {
class RouterStageMerge final : public RouterExecStage {
public:
RouterStageMerge(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams&& armParams)
- : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor, nullptr) {}
+ : RouterExecStage(opCtx),
+ _resultsMerger(opCtx, std::move(armParams), std::move(executor), nullptr) {}
StatusWith<ClusterQueryResult> next(ExecContext execCtx) final {
return _resultsMerger.next(getOpCtx(), execCtx);
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index e2edf321a4e..9877af451a2 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -74,7 +74,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const HostAndPort& server,
const BSONObj& cmdResult,
const NamespaceString& requestedNss,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
ClusterCursorManager* cursorManager,
PrivilegeVector privileges,
TailableModeEnum tailableMode) {
@@ -117,7 +117,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
params.isAutoCommit = false;
}
- auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params));
+ auto ccc = ClusterClientCursorImpl::make(opCtx, std::move(executor), std::move(params));
// We don't expect to use this cursor until a subsequent getMore, so detach from the current
// OperationContext until then.
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index 5202935df32..38b13b4ea7a 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -78,7 +78,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const HostAndPort& server,
const BSONObj& cmdResult,
const NamespaceString& requestedNss,
- executor::TaskExecutor* executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
ClusterCursorManager* cursorManager,
PrivilegeVector privileges,
TailableModeEnum tailableMode = TailableModeEnum::kNormal);