diff options
author | George Wangensteen <george.wangensteen@mongodb.com> | 2023-04-05 13:50:46 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-05 18:07:43 +0000 |
commit | db68d8f57be484d25ba4c37bd049e0c07cb7d645 (patch) | |
tree | c0eaf82f0b2a54b4f444c95d163be4f337895a58 | |
parent | f2df94329a61c09fddeb8007b184b900be22b774 (diff) | |
download | mongo-db68d8f57be484d25ba4c37bd049e0c07cb7d645.tar.gz |
SERVER-73613 Let TaskExecutorCursor Pin Connections When Requested
-rw-r--r-- | src/mongo/executor/SConscript | 21 | ||||
-rw-r--r-- | src/mongo/executor/pinned_connection_task_executor.cpp | 77 | ||||
-rw-r--r-- | src/mongo/executor/pinned_connection_task_executor.h | 10 | ||||
-rw-r--r-- | src/mongo/executor/pinned_connection_task_executor_factory.cpp | 54 | ||||
-rw-r--r-- | src/mongo/executor/pinned_connection_task_executor_factory.h | 56 | ||||
-rw-r--r-- | src/mongo/executor/pinned_connection_task_executor_test.cpp | 18 | ||||
-rw-r--r-- | src/mongo/executor/pinned_connection_task_executor_test_fixture.h | 47 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor.cpp | 90 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor.h | 15 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor_integration_test.cpp | 87 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor_test.cpp | 929 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 3 |
12 files changed, 979 insertions, 428 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 86e36f5c49a..b112f2c6aa3 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -330,6 +330,9 @@ env.Library( '$BUILD_DIR/mongo/db/query/command_request_response', 'task_executor_interface', ], + LIBDEPS_PRIVATE=[ + 'pinned_connection_task_executor_factory', + ], ) env.Library( @@ -346,6 +349,21 @@ env.Library( ) env.Library( + target='pinned_connection_task_executor_factory', + source=[ + 'pinned_connection_task_executor_factory.cpp', + ], + LIBDEPS=[ + 'task_executor_interface', + ], + LIBDEPS_PRIVATE=[ + 'network_interface', + 'pinned_connection_task_executor', + 'thread_pool_task_executor', + ], +) + +env.Library( target='inline_executor', source=[ 'inline_executor.cpp', @@ -386,7 +404,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/hello_command', '$BUILD_DIR/mongo/db/service_context_test_fixture', '$BUILD_DIR/mongo/s/mongos_server_parameters', - "$BUILD_DIR/mongo/s/sharding_router_test_fixture", + '$BUILD_DIR/mongo/s/sharding_router_test_fixture', '$BUILD_DIR/mongo/transport/message_compressor', '$BUILD_DIR/mongo/util/clock_source_mock', 'async_rpc', @@ -419,6 +437,7 @@ env.CppIntegrationTest( '$BUILD_DIR/mongo/db/wire_version', '$BUILD_DIR/mongo/executor/network_interface_factory', '$BUILD_DIR/mongo/executor/network_interface_thread_pool', + '$BUILD_DIR/mongo/executor/pinned_connection_task_executor', '$BUILD_DIR/mongo/executor/thread_pool_task_executor', '$BUILD_DIR/mongo/transport/transport_layer_egress_init', '$BUILD_DIR/mongo/util/concurrency/thread_pool', diff --git a/src/mongo/executor/pinned_connection_task_executor.cpp b/src/mongo/executor/pinned_connection_task_executor.cpp index 727726e59c3..975a3cbfcdb 100644 --- a/src/mongo/executor/pinned_connection_task_executor.cpp +++ b/src/mongo/executor/pinned_connection_task_executor.cpp @@ -33,7 +33,6 @@ #include "mongo/util/scoped_unlock.h" namespace mongo::executor { - /** * Used as the state for callbacks _only_ for RPCs scheduled through this executor. */ @@ -68,15 +67,22 @@ public: } // Run callback with a CallbackCanceled error. - static void runCallbackCanceled(RequestAndCallback rcb, TaskExecutor* exec) { + static void runCallbackCanceled(stdx::unique_lock<Latch>& lk, + RequestAndCallback rcb, + TaskExecutor* exec) { CallbackHandle cbHandle; setCallbackForHandle(&cbHandle, rcb.second); auto errorResponse = RemoteCommandOnAnyResponse(boost::none, kCallbackCanceledErrorStatus); - rcb.second->callback({exec, cbHandle, rcb.first, errorResponse}); + TaskExecutor::RemoteCommandOnAnyCallbackFn callback; + using std::swap; + swap(rcb.second->callback, callback); + ScopedUnlock guard(lk); + callback({exec, cbHandle, rcb.first, errorResponse}); } // Run callback with the provided success result. - static void runCallbackSuccess(RequestAndCallback rcb, + static void runCallbackSuccess(stdx::unique_lock<Latch>& lk, + RequestAndCallback rcb, TaskExecutor* exec, const StatusWith<RemoteCommandResponse>& result, const HostAndPort& targetUsed) { @@ -87,16 +93,21 @@ public: RemoteCommandOnAnyResponse asOnAnyRcr(targetUsed, asRcr); CallbackHandle cbHandle; setCallbackForHandle(&cbHandle, rcb.second); - rcb.second->callback({exec, cbHandle, rcb.first, asOnAnyRcr}); + TaskExecutor::RemoteCommandOnAnyCallbackFn callback; + using std::swap; + swap(rcb.second->callback, callback); + ScopedUnlock guard(lk); + callback({exec, cbHandle, rcb.first, asOnAnyRcr}); } // All fields except for "canceled" are guarded by the owning task executor's _mutex. enum class State { kWaiting, kRunning, kDone, kCanceled }; - const RemoteCommandOnAnyCallbackFn callback; + RemoteCommandOnAnyCallbackFn callback; boost::optional<stdx::condition_variable> finishedCondition; State state{State::kWaiting}; bool isNetworkOperation = true; + bool startedNetworking = false; BatonHandle baton; }; @@ -239,10 +250,17 @@ ExecutorFuture<void> PinnedConnectionTaskExecutor::_ensureStream(WithLock, } Future<executor::RemoteCommandResponse> PinnedConnectionTaskExecutor::_runSingleCommand( - RemoteCommandRequest command, BatonHandle baton) { + RemoteCommandRequest command, std::shared_ptr<CallbackState> cbState) { stdx::lock_guard lk{_mutex}; + if (auto& state = cbState->state; MONGO_unlikely(state == CallbackState::State::kCanceled)) { + // It's possible this callback was canceled after it was moved + // out of the queue, but before we actually started work on the client. + // In that case, don't run it. + return kCallbackCanceledErrorStatus; + } auto client = _stream->getClient(); - return client->runCommandRequest(command, baton); + cbState->startedNetworking = true; + return client->runCommandRequest(command, cbState->baton); } boost::optional<PinnedConnectionTaskExecutor::RequestAndCallback> @@ -251,8 +269,7 @@ PinnedConnectionTaskExecutor::_getFirstUncanceledRequest(stdx::unique_lock<Latch auto req = std::move(_requestQueue.front()); _requestQueue.pop_front(); if (req.second->state == CallbackState::State::kCanceled) { - ScopedUnlock guard(lk); - CallbackState::runCallbackCanceled(req, this); + CallbackState::runCallbackCanceled(lk, req, this); } else { return req; } @@ -276,32 +293,38 @@ void PinnedConnectionTaskExecutor::_doNetworking(stdx::unique_lock<Latch>&& lk) invariant(req.second->state == CallbackState::State::kWaiting); req.second->state = CallbackState::State::kRunning; auto streamFut = _ensureStream(lk, req.first.target, req.first.timeout); + // Stash the in-progress operation before releasing the lock so we can + // access it if we're shutdown while it's in-progress. + _inProgressRequest = req.second; lk.unlock(); std::move(streamFut) - .then([req, this]() { return _runSingleCommand(req.first, req.second->baton); }) + .then([req, this]() { return _runSingleCommand(req.first, req.second); }) .thenRunOn(makeGuaranteedExecutor(req.second->baton, _cancellationExecutor)) - .getAsync([req, this](StatusWith<RemoteCommandResponse> result) { + .getAsync([req, this, self = shared_from_this()](StatusWith<RemoteCommandResponse> result) { stdx::unique_lock<Latch> lk{_mutex}; + _inProgressRequest.reset(); if (auto& state = req.second->state; MONGO_unlikely(state == CallbackState::State::kCanceled)) { - ScopedUnlock guard(lk); - CallbackState::runCallbackCanceled(req, this); + CallbackState::runCallbackCanceled(lk, req, this); } else { invariant(state == CallbackState::State::kRunning); + invariant(req.second->startedNetworking); state = CallbackState::State::kDone; auto target = _stream->getClient()->remote(); - ScopedUnlock guard(lk); - CallbackState::runCallbackSuccess(req, this, result, target); + CallbackState::runCallbackSuccess(lk, req, this, result, target); } - if (auto status = result.getStatus(); status.isOK()) { - _stream->indicateUsed(); - _stream->indicateSuccess(); - } else { - // We didn't get a response from the remote. - // We assume the stream is broken and therefore can do no more work. Notify the - // stream of the failure, and shutdown. - _stream->indicateFailure(status); - _shutdown(lk); + // If we used the _stream, update it accordingly. + if (req.second->startedNetworking) { + if (auto status = result.getStatus(); status.isOK()) { + _stream->indicateUsed(); + _stream->indicateSuccess(); + } else { + // We didn't get a response from the remote. + // We assume the stream is broken and therefore can do no more work. Notify the + // stream of the failure, and shutdown. + _stream->indicateFailure(status); + _shutdown(lk); + } } _isDoingNetworking = false; if (!_requestQueue.empty()) { @@ -320,6 +343,10 @@ void PinnedConnectionTaskExecutor::_shutdown(WithLock lk) { for (auto&& [_, cbState] : _requestQueue) { _cancel(lk, cbState.get()); } + if (_isDoingNetworking && _inProgressRequest) { + // Cancel the in-progress request that was already popped from the queue. + _cancel(lk, _inProgressRequest.get()); + } } void PinnedConnectionTaskExecutor::shutdown() { diff --git a/src/mongo/executor/pinned_connection_task_executor.h b/src/mongo/executor/pinned_connection_task_executor.h index 514effd87d7..cf960fb0121 100644 --- a/src/mongo/executor/pinned_connection_task_executor.h +++ b/src/mongo/executor/pinned_connection_task_executor.h @@ -140,14 +140,15 @@ private: // Start processing pending/queued RPCs. void _doNetworking(stdx::unique_lock<Latch>&&); - // Asynchronously invoke the RPC `command` and return a future of its response. + // CallbackState for RPCs. Non-RPC callbacks use the CallbackState from the _underlyingExecutor. + class CallbackState; + + // Invoke the RPC and return a future of its response. Future<RemoteCommandResponse> _runSingleCommand(RemoteCommandRequest command, - BatonHandle baton); + std::shared_ptr<CallbackState> cbState); void _shutdown(WithLock); - // CallbackState for RPCs. Non-RPC callbacks use the CallbackState from the _underlyingExecutor. - class CallbackState; // Alias for an RPC request and the associated CallbackState. using RequestAndCallback = std::pair<RemoteCommandRequest, std::shared_ptr<CallbackState>>; @@ -183,6 +184,7 @@ private: // RPCs scheduled through this executor. std::unique_ptr<NetworkInterface::LeasedStream> _stream; bool _isDoingNetworking{false}; + std::shared_ptr<CallbackState> _inProgressRequest; enum class State { running, joinRequired, joining, shutdownComplete }; State _state = State::running; diff --git a/src/mongo/executor/pinned_connection_task_executor_factory.cpp b/src/mongo/executor/pinned_connection_task_executor_factory.cpp new file mode 100644 index 00000000000..e5871adbaa0 --- /dev/null +++ b/src/mongo/executor/pinned_connection_task_executor_factory.cpp @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <memory> + +#include "mongo/executor/pinned_connection_task_executor_factory.h" + +#include "mongo/executor/pinned_connection_task_executor.h" +#include "mongo/executor/thread_pool_task_executor.h" + +namespace mongo { +namespace executor { + +std::shared_ptr<TaskExecutor> makePinnedConnectionTaskExecutor(std::shared_ptr<TaskExecutor> exec, + NetworkInterface* net) { + return std::make_shared<PinnedConnectionTaskExecutor>(std::move(exec), net); +} + +std::shared_ptr<TaskExecutor> makePinnedConnectionTaskExecutor(std::shared_ptr<TaskExecutor> exec) { + auto tpte = dynamic_cast<ThreadPoolTaskExecutor*>(exec.get()); + invariant(tpte, + "Connection-pinning task executors can only be constructed from " + "ThreadPoolTaskExecutor unless an explicit NetworkInterface is provided."); + return makePinnedConnectionTaskExecutor(std::move(exec), tpte->_net.get()); +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/pinned_connection_task_executor_factory.h b/src/mongo/executor/pinned_connection_task_executor_factory.h new file mode 100644 index 00000000000..dbf79481bff --- /dev/null +++ b/src/mongo/executor/pinned_connection_task_executor_factory.h @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> + +#include "mongo/executor/network_interface.h" +#include "mongo/executor/task_executor.h" + +namespace mongo { +namespace executor { + +/** + * Returns a new TaskExecutor that does all of its RPC execution over the same transport session. + * The returned executor uses `exec`'s execution resources and acquires the transport session from + * `net`. + */ +std::shared_ptr<TaskExecutor> makePinnedConnectionTaskExecutor(std::shared_ptr<TaskExecutor> exec, + NetworkInterface* net); + +/** + * Returns a new TaskExecutor that does all of its RPC execution over the same transport session. + * The provided executor _must_ be a ThreadPoolTaskExecutor, and its underlying execution and + * network resources will be used by the returned executor. + */ +std::shared_ptr<TaskExecutor> makePinnedConnectionTaskExecutor(std::shared_ptr<TaskExecutor> exec); + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/pinned_connection_task_executor_test.cpp b/src/mongo/executor/pinned_connection_task_executor_test.cpp index 8313642fa43..d5629c898ab 100644 --- a/src/mongo/executor/pinned_connection_task_executor_test.cpp +++ b/src/mongo/executor/pinned_connection_task_executor_test.cpp @@ -230,7 +230,6 @@ DEATH_TEST_REGEX_F( // Second command should invariant once the PCTE attempts to run it, because it has a different // remote target. - // Should never be fulfilled. ASSERT_OK(pfTwo.future.getNoThrow()); } @@ -242,7 +241,7 @@ TEST_F(PinnedConnectionTaskExecutorTest, CancelRPC) { auto rcr = makeRCR(remote, BSONObj()); auto pf = makePromiseFuture<void>(); - // Schedule a command + // Schedule a command. auto swCbHandle = pinnedTE->scheduleRemoteCommand( std::move(rcr), [&](const TaskExecutor::RemoteCommandCallbackArgs& args) { pf.promise.setWith([&] { return args.response.status; }); @@ -256,6 +255,19 @@ TEST_F(PinnedConnectionTaskExecutorTest, CancelRPC) { pinnedTE->join(); } +TEST_F(PinnedConnectionTaskExecutorTest, ShutdownWithRPCInProgress) { + auto pinnedTE = makePinnedConnTaskExecutor(); + auto pf = makePromiseFuture<void>(); + ASSERT_OK(pinnedTE->scheduleRemoteCommand( + makeRCR(HostAndPort("mock"), BSONObj()), + [&](const TaskExecutor::RemoteCommandCallbackArgs& args) { + pf.promise.setWith([&] { return args.response.status; }); + })); + pinnedTE->shutdown(); + ASSERT_EQ(pf.future.getNoThrow(), TaskExecutor::kCallbackCanceledErrorStatus); + pinnedTE->join(); +} + TEST_F(PinnedConnectionTaskExecutorTest, CancelNonRPC) { auto pinnedTE = makePinnedConnTaskExecutor(); @@ -317,7 +329,7 @@ TEST_F(PinnedConnectionTaskExecutorTest, EnsureStreamIsUpdatedAfterUse) { TEST_F(PinnedConnectionTaskExecutorTest, StreamFailureShutsDownAndCancels) { auto pinnedTE = makePinnedConnTaskExecutor(); HostAndPort remote("mock"); - // + // We haven't done any RPCs, so we shouldn't have touched any of the stream counters. ASSERT_EQ(_indicateSuccessCalls.load(), 0); ASSERT_EQ(_indicateUsedCalls.load(), 0); diff --git a/src/mongo/executor/pinned_connection_task_executor_test_fixture.h b/src/mongo/executor/pinned_connection_task_executor_test_fixture.h index a46b1a31d2f..8e7c3c503d1 100644 --- a/src/mongo/executor/pinned_connection_task_executor_test_fixture.h +++ b/src/mongo/executor/pinned_connection_task_executor_test_fixture.h @@ -33,15 +33,16 @@ #include "mongo/db/dbmessage.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/pinned_connection_task_executor.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/transport/mock_session.h" #include "mongo/transport/transport_layer.h" #include "mongo/transport/transport_layer_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/concurrency/thread_pool.h" -#include "pinned_connection_task_executor.h" namespace mongo::executor { @@ -66,8 +67,17 @@ public: Status sinkMessageCalled(Message message) { stdx::unique_lock lk{_mutex}; - _cv.wait(lk, [&] { return !!_sinkMessageExpectation; }); + _hasWaitingSinkMessage = true; + _cv.wait(lk, [&] { return !!_sinkMessageExpectation || _isCanceled; }); + if (_isCanceled) { + // Consume the cancellation. + _isCanceled = false; + _sinkMessageExpectation = [&](auto&&) { + return _cancellationError; + }; + } auto expectation = *std::exchange(_sinkMessageExpectation, {}); + _hasWaitingSinkMessage = false; return expectation(message); } @@ -80,8 +90,17 @@ public: StatusWith<Message> sourceMessageCalled() { stdx::unique_lock lk{_mutex}; - _cv.wait(lk, [&] { return !!_sourceMessageExpectation; }); + _hasWaitingSourceMessage = true; + _cv.wait(lk, [&] { return !!_sourceMessageExpectation || _isCanceled; }); + if (_isCanceled) { + // Consume the cancellation. + _isCanceled = false; + _sourceMessageExpectation = [&]() { + return _cancellationError; + }; + } auto expectation = *std::exchange(_sourceMessageExpectation, {}); + _hasWaitingSourceMessage = false; return expectation(); } @@ -93,22 +112,16 @@ public: } void cancelAsyncOpsCalled() { - // If cancelAsyncOps was called on the session, we've already - // reached the "running" stage. So we can mock a response - // for source/sink message to allow the cancellation to take - // in the executor. stdx::unique_lock lk{_mutex}; - _sourceMessageExpectation = []() { - rpc::OpMsgReplyBuilder replyBuilder; - replyBuilder.setCommandReply(BSONObj()); - return replyBuilder.done(); - }; - _sinkMessageExpectation = [](auto&&) { - return Status::OK(); - }; + _isCanceled = true; _cv.notify_one(); } + bool hasReadyRequests() { + stdx::lock_guard lk{_mutex}; + return _hasWaitingSinkMessage || _hasWaitingSourceMessage; + } + std::shared_ptr<PinnedConnectionTaskExecutor> makePinnedConnTaskExecutor() { return std::make_shared<PinnedConnectionTaskExecutor>(getExecutorPtr(), getNet()); } @@ -119,6 +132,10 @@ private: stdx::condition_variable _cv; boost::optional<SinkMessageCbT> _sinkMessageExpectation; boost::optional<SourceMessageCbT> _sourceMessageExpectation; + bool _hasWaitingSinkMessage = false; + bool _hasWaitingSourceMessage = false; + bool _isCanceled = false; + Status _cancellationError = Status{ErrorCodes::SocketException, "Socket closed"}; class CustomMockSession : public transport::MockSessionBase { public: diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp index 015fb9944dd..dcbe6e1a945 100644 --- a/src/mongo/executor/task_executor_cursor.cpp +++ b/src/mongo/executor/task_executor_cursor.cpp @@ -34,6 +34,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/query/getmore_command_gen.h" #include "mongo/db/query/kill_cursors_gen.h" +#include "mongo/executor/pinned_connection_task_executor_factory.h" #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" #include "mongo/util/time_support.h" @@ -42,24 +43,38 @@ namespace mongo { namespace executor { +namespace { +MONGO_FAIL_POINT_DEFINE(blockBeforePinnedExecutorIsDestroyedOnUnderlying); +} // namespace -TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor, +TaskExecutorCursor::TaskExecutorCursor(std::shared_ptr<executor::TaskExecutor> executor, const RemoteCommandRequest& rcr, Options&& options) - : _executor(executor), _rcr(rcr), _options(std::move(options)), _batchIter(_batch.end()) { + : _rcr(rcr), _options(std::move(options)), _batchIter(_batch.end()) { if (rcr.opCtx) { _lsid = rcr.opCtx->getLogicalSessionId(); } + if (_options.pinConnection) { + _executor = makePinnedConnectionTaskExecutor(executor); + _underlyingExecutor = std::move(executor); + } else { + _executor = std::move(executor); + } _runRemoteCommand(_createRequest(_rcr.opCtx, _rcr.cmdObj)); } -TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor, +TaskExecutorCursor::TaskExecutorCursor(std::shared_ptr<executor::TaskExecutor> executor, + std::shared_ptr<executor::TaskExecutor> underlyingExec, CursorResponse&& response, RemoteCommandRequest& rcr, Options&& options) - : _executor(executor), _rcr(rcr), _options(std::move(options)), _batchIter(_batch.end()) { + : _executor(std::move(executor)), + _underlyingExecutor(std::move(underlyingExec)), + _rcr(rcr), + _options(std::move(options)), + _batchIter(_batch.end()) { tassert(6253101, "rcr must have an opCtx to use construct cursor from response", rcr.opCtx); _lsid = rcr.opCtx->getLogicalSessionId(); @@ -67,7 +82,8 @@ TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor, } TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other) - : _executor(other._executor), + : _executor(std::move(other._executor)), + _underlyingExecutor(std::move(other._underlyingExecutor)), _rcr(other._rcr), _options(std::move(other._options)), _lsid(other._lsid), @@ -98,15 +114,19 @@ TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other) TaskExecutorCursor::~TaskExecutorCursor() { try { - if (_cursorId < kMinLegalCursorId) { + if (_cursorId < kMinLegalCursorId || _options.pinConnection) { // The initial find to establish the cursor has to be canceled to avoid leaking cursors. // Once the cursor is established, killing the cursor will interrupt any ongoing // `getMore` operation. + // Additionally, in pinned mode, we should cancel any in-progress RPC if there is one, + // even at the cost of churning the connection, because it's the only way to interrupt + // the ongoing operation. if (_cmdState) { _executor->cancel(_cmdState->cbHandle); } - - return; + if (_cursorId < kMinLegalCursorId) { + return; + } } // We deliberately ignore failures to kill the cursor. This "best effort" is acceptable @@ -115,15 +135,43 @@ TaskExecutorCursor::~TaskExecutorCursor() { // That timeout mechanism could be the default cursor timeout, or the logical session // timeout if an lsid is used. // - // Killing the cursor also interrupts any ongoing getMore operations on this cursor. Avoid - // canceling the remote command through its callback handle as that may close the underlying - // connection. - _executor - ->scheduleRemoteCommand( - _createRequest(nullptr, - KillCursorsCommandRequest(_ns, {_cursorId}).toBSON(BSONObj{})), - [](const auto&) {}) - .isOK(); + // In non-pinned mode, killing the cursor also interrupts any ongoing getMore operations on + // this cursor. Avoid canceling the remote command through its callback handle as that may + // close the underlying connection. + // + // In pinned mode, we must await completion of the killCursors to safely reuse the pinned + // connection. This requires allocating an executor thread (from `_underlyingExecutor`) upon + // completion of the killCursors command to shutdown and destroy the pinned executor. This + // is necessary as joining an executor from its own threads results in a deadlock. + TaskExecutor::RemoteCommandCallbackFn callbackToRun = [](const auto&) { + }; + if (_options.pinConnection) { + invariant(_underlyingExecutor, + "TaskExecutorCursor in pinning mode must have an underlying executor"); + callbackToRun = [main = _executor, underlying = _underlyingExecutor](const auto&) { + underlying->schedule([main = std::move(main)](const auto&) { + if (MONGO_unlikely( + blockBeforePinnedExecutorIsDestroyedOnUnderlying.shouldFail())) { + LOGV2(7361300, + "Hanging before destroying a TaskExecutorCursor's pinning executor."); + blockBeforePinnedExecutorIsDestroyedOnUnderlying.pauseWhileSet(); + } + // Returning from this callback will destroy the pinned executor on + // underlying if this is the last TaskExecutorCursor using that pinned executor. + }); + }; + } + auto swCallback = _executor->scheduleRemoteCommand( + _createRequest(nullptr, KillCursorsCommandRequest(_ns, {_cursorId}).toBSON(BSONObj{})), + callbackToRun); + + // It's possible the executor is already shutdown and rejects work. If so, run the callback + // inline. + if (!swCallback.isOK()) { + TaskExecutor::RemoteCommandCallbackArgs args( + _executor.get(), {}, {}, swCallback.getStatus()); + callbackToRun(args); + } } catch (const DBException& ex) { LOGV2(6531704, "Encountered an error while destroying a cursor executor", @@ -247,11 +295,17 @@ void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) { // them. Skip the first response, we used it to populate this cursor. // Ensure we update the RCR we give to each 'child cursor' with the current opCtx. auto freshRcr = _createRequest(opCtx, _rcr.cmdObj); + auto copyOptions = [&] { + TaskExecutorCursor::Options options; + options.pinConnection = _options.pinConnection; + return options; + }; for (unsigned int i = 1; i < cursorResponses.size(); ++i) { _additionalCursors.emplace_back(_executor, + _underlyingExecutor, uassertStatusOK(std::move(cursorResponses[i])), freshRcr, - TaskExecutorCursor::Options()); + copyOptions()); } } diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h index 59015e87d1c..5da3a68bae5 100644 --- a/src/mongo/executor/task_executor_cursor.h +++ b/src/mongo/executor/task_executor_cursor.h @@ -68,6 +68,8 @@ public: struct Options { boost::optional<int64_t> batchSize; + bool pinConnection{false}; + Options() {} }; /** @@ -78,7 +80,7 @@ public: * opCtx - The Logical Session Id from the initial command is carried over in all later stages. * NOTE - the actual command must not include the lsid */ - explicit TaskExecutorCursor(executor::TaskExecutor* executor, + explicit TaskExecutorCursor(std::shared_ptr<executor::TaskExecutor> executor, const RemoteCommandRequest& rcr, Options&& options = {}); @@ -87,8 +89,11 @@ public: * The executor is used for subsequent getMore calls. Uses the original RemoteCommandRequest * to build subsequent commands. Takes ownership of the CursorResponse and gives it to the new * cursor. + * If the cursor should reuse the original transport connection that opened the original + * cursor, make sure the pinning executor that was used to open that cursor is provided. */ - TaskExecutorCursor(executor::TaskExecutor* executor, + TaskExecutorCursor(std::shared_ptr<executor::TaskExecutor> executor, + std::shared_ptr<executor::TaskExecutor> underlyingExec, CursorResponse&& response, RemoteCommandRequest& rcr, Options&& options = {}); @@ -191,7 +196,11 @@ private: */ const RemoteCommandRequest& _createRequest(OperationContext* opCtx, const BSONObj& cmd); - executor::TaskExecutor* const _executor; + std::shared_ptr<executor::TaskExecutor> _executor; + // If we are pinning connections, we need to keep a separate reference to the + // non-pinning, normal executor, so that we can shut down the pinned executor + // out-of-line. + std::shared_ptr<executor::TaskExecutor> _underlyingExecutor; // Used as a scratch pad for the successive scheduleRemoteCommand calls RemoteCommandRequest _rcr; diff --git a/src/mongo/executor/task_executor_cursor_integration_test.cpp b/src/mongo/executor/task_executor_cursor_integration_test.cpp index abcb8666a00..c3dcfce091a 100644 --- a/src/mongo/executor/task_executor_cursor_integration_test.cpp +++ b/src/mongo/executor/task_executor_cursor_integration_test.cpp @@ -37,6 +37,7 @@ #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" +#include "mongo/executor/pinned_connection_task_executor.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" #include "mongo/unittest/integration_test.h" @@ -55,20 +56,25 @@ public: } void setUp() override { - std::shared_ptr<NetworkInterface> ni = makeNetworkInterface("TaskExecutorCursorTest"); - auto tp = std::make_unique<NetworkInterfaceThreadPool>(ni.get()); + _ni = makeNetworkInterface("TaskExecutorCursorTest"); + auto tp = std::make_unique<NetworkInterfaceThreadPool>(_ni.get()); - _executor = std::make_unique<ThreadPoolTaskExecutor>(std::move(tp), std::move(ni)); + _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(tp), _ni); _executor->startup(); }; void tearDown() override { _executor->shutdown(); + _executor->join(); _executor.reset(); }; - TaskExecutor* executor() { - return _executor.get(); + std::shared_ptr<TaskExecutor> executor() { + return _executor; + } + + auto net() { + return _ni.get(); } auto makeOpCtx() { @@ -93,7 +99,8 @@ public: private: ServiceContext::UniqueServiceContext _serviceCtx = ServiceContext::make(); - std::unique_ptr<ThreadPoolTaskExecutor> _executor; + std::shared_ptr<ThreadPoolTaskExecutor> _executor; + std::shared_ptr<NetworkInterface> _ni; ServiceContext::UniqueClient _client = _serviceCtx->makeClient("TaskExecutorCursorTest"); }; @@ -114,7 +121,7 @@ size_t createTestData(std::string ns, size_t numDocs) { return dbclient->count(nss); } -// Test that we can actually use a TaskExecutorCursor to read multiple batches from a remote host +// Test that we can actually use a TaskExecutorCursor to read multiple batches from a remote host. TEST_F(TaskExecutorCursorFixture, Basic) { const size_t numDocs = 100; ASSERT_EQ(createTestData("test.test", numDocs), numDocs); @@ -141,6 +148,70 @@ TEST_F(TaskExecutorCursorFixture, Basic) { ASSERT_EQUALS(count, numDocs); } +// Test that we can actually use a TaskExecutorCursor that pins it's connection to read multiple +// batches from a remote host. +TEST_F(TaskExecutorCursorFixture, BasicPinned) { + const size_t numDocs = 100; + ASSERT_EQ(createTestData("test.test", numDocs), numDocs); + + auto opCtx = makeOpCtx(); + RemoteCommandRequest rcr(unittest::getFixtureConnectionString().getServers().front(), + "test", + BSON("find" + << "test" + << "batchSize" << 10), + opCtx.get()); + + TaskExecutorCursor tec(executor(), rcr, [this] { + TaskExecutorCursor::Options opts; + opts.batchSize = 10; + opts.pinConnection = true; + return opts; + }()); + + size_t count = 0; + while (auto doc = tec.getNext(opCtx.get())) { + count++; + } + + ASSERT_EQUALS(count, numDocs); +} + +// Test that when a TaskExecutorCursor is used in pinning-mode, the pinned executor's destruction +// is scheduled on the underlying executor. +TEST_F(TaskExecutorCursorFixture, PinnedExecutorDestroyedOnUnderlying) { + const size_t numDocs = 100; + ASSERT_EQ(createTestData("test.test", numDocs), numDocs); + + auto opCtx = makeOpCtx(); + RemoteCommandRequest rcr(unittest::getFixtureConnectionString().getServers().front(), + "test", + BSON("find" + << "test" + << "batchSize" << 10), + opCtx.get()); + + boost::optional<TaskExecutorCursor> tec; + tec.emplace(executor(), rcr, [] { + TaskExecutorCursor::Options opts; + opts.batchSize = 10; + opts.pinConnection = true; + return opts; + }()); + // Fetch a documents to make sure the TEC was initialized properly. + ASSERT(tec->getNext(opCtx.get())); + // Enable the failpoint in the integration test process. + { + FailPointEnableBlock fpb("blockBeforePinnedExecutorIsDestroyedOnUnderlying"); + auto initialTimesEntered = fpb.initialTimesEntered(); + // Destroy the TEC and ensure we reach the code block that will destroy the pinned executor. + tec.reset(); + LOGV2(7361301, "Waiting for TaskExecutorCursor to destroy its pinning executor."); + fpb->waitForTimesEntered(initialTimesEntered + 1); + } + // Allow the pinned executor's destruction to proceed. +} + /** * Verifies that the underlying connection used to run `getMore` commands remains open, even after * the instance of `TaskExecutorCursor` is destroyed. @@ -240,7 +311,7 @@ TEST_F(TaskExecutorCursorFixture, ConnectionRemainsOpenAfterKillingTheCursor) { const auto afterStats = getConnectionStatsForTarget(); auto countOpenConns = [](const ConnectionStatsPer& stats) { - return stats.inUse + stats.available + stats.refreshing; + return stats.inUse + stats.available + stats.refreshing + stats.leased; }; // Verify that no connection is created or closed. diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp index 982aa527fe4..02db9ed2967 100644 --- a/src/mongo/executor/task_executor_cursor_test.cpp +++ b/src/mongo/executor/task_executor_cursor_test.cpp @@ -31,8 +31,11 @@ #include "mongo/platform/basic.h" #include "mongo/db/concurrency/locker_noop_client_observer.h" +#include "mongo/executor/pinned_connection_task_executor.h" +#include "mongo/executor/pinned_connection_task_executor_test_fixture.h" #include "mongo/executor/task_executor_cursor.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/unittest/bson_test_util.h" #include "mongo/unittest/unittest.h" @@ -43,490 +46,714 @@ namespace mongo { namespace executor { namespace { +BSONObj buildCursorResponse(StringData fieldName, size_t start, size_t end, size_t cursorId) { + BSONObjBuilder bob; + { + BSONObjBuilder cursor(bob.subobjStart("cursor")); + { + BSONArrayBuilder batch(cursor.subarrayStart(fieldName)); + + for (size_t i = start; i <= end; ++i) { + BSONObjBuilder doc(batch.subobjStart()); + doc.append("x", int(i)); + } + } + cursor.append("id", (long long)(cursorId)); + cursor.append("ns", "test.test"); + } + bob.append("ok", int(1)); + return bob.obj(); +} + +BSONObj buildMultiCursorResponse(StringData fieldName, + size_t start, + size_t end, + std::vector<size_t> cursorIds) { + BSONObjBuilder bob; + { + BSONArrayBuilder cursors; + int baseCursorValue = 1; + for (auto cursorId : cursorIds) { + BSONObjBuilder cursor; + BSONArrayBuilder batch; + ASSERT(start < end && end < INT_MAX); + for (size_t i = start; i <= end; ++i) { + batch.append(BSON("x" << static_cast<int>(i) * baseCursorValue).getOwned()); + } + cursor.append(fieldName, batch.arr()); + cursor.append("id", (long long)(cursorId)); + cursor.append("ns", "test.test"); + auto cursorObj = BSON("cursor" << cursor.done() << "ok" << 1); + cursors.append(cursorObj.getOwned()); + ++baseCursorValue; + } + bob.append("cursors", cursors.arr()); + } + bob.append("ok", 1); + return bob.obj(); +} + /** * Fixture for the task executor cursor tests which offers some convenience methods to help with - * scheduling responses + * scheduling responses. Uses the CRTP pattern so that the tests can be shared between child-classes + * that provide their own implementations of the network-mocking needed for the tests. */ -class TaskExecutorCursorFixture : public ThreadPoolExecutorTest { +template <typename Derived, typename Base> +class TaskExecutorCursorTestFixture : public Base { public: void setUp() override { - ThreadPoolExecutorTest::setUp(); - + Base::setUp(); client = serviceCtx->makeClient("TaskExecutorCursorTest"); opCtx = client->makeOperationContext(); - - launchExecutorThread(); + static_cast<Derived*>(this)->postSetUp(); } void tearDown() override { opCtx.reset(); client.reset(); - ThreadPoolExecutorTest::tearDown(); + Base::tearDown(); } BSONObj scheduleSuccessfulCursorResponse(StringData fieldName, size_t start, size_t end, size_t cursorId) { - NetworkInterfaceMock::InNetworkGuard ing(getNet()); - - BSONObjBuilder bob; - { - BSONObjBuilder cursor(bob.subobjStart("cursor")); - { - BSONArrayBuilder batch(cursor.subarrayStart(fieldName)); - - for (size_t i = start; i <= end; ++i) { - BSONObjBuilder doc(batch.subobjStart()); - doc.append("x", int(i)); - } - } - cursor.append("id", (long long)(cursorId)); - cursor.append("ns", "test.test"); - } - bob.append("ok", int(1)); - - ASSERT(getNet()->hasReadyRequests()); - auto rcr = getNet()->scheduleSuccessfulResponse(bob.obj()); - getNet()->runReadyNetworkOperations(); - - return rcr.cmdObj.getOwned(); + return static_cast<Derived*>(this)->scheduleSuccessfulCursorResponse( + fieldName, start, end, cursorId); } BSONObj scheduleSuccessfulMultiCursorResponse(StringData fieldName, size_t start, size_t end, std::vector<size_t> cursorIds) { - NetworkInterfaceMock::InNetworkGuard ing(getNet()); - - BSONObjBuilder bob; - { - BSONArrayBuilder cursors; - int baseCursorValue = 1; - for (auto cursorId : cursorIds) { - BSONObjBuilder cursor; - BSONArrayBuilder batch; - ASSERT(start < end && end < INT_MAX); - for (size_t i = start; i <= end; ++i) { - batch.append(BSON("x" << static_cast<int>(i) * baseCursorValue).getOwned()); - } - cursor.append(fieldName, batch.arr()); - cursor.append("id", (long long)(cursorId)); - cursor.append("ns", "test.test"); - auto cursorObj = BSON("cursor" << cursor.done() << "ok" << 1); - cursors.append(cursorObj.getOwned()); - ++baseCursorValue; - } - bob.append("cursors", cursors.arr()); - } - bob.append("ok", 1); - - ASSERT(getNet()->hasReadyRequests()); - auto rcr = getNet()->scheduleSuccessfulResponse(bob.obj()); - getNet()->runReadyNetworkOperations(); + return static_cast<Derived*>(this)->scheduleSuccessfulMultiCursorResponse( + fieldName, start, end, cursorIds); + } - return rcr.cmdObj.getOwned(); + void scheduleErrorResponse(Status error) { + return static_cast<Derived*>(this)->scheduleErrorResponse(error); + } + void blackHoleNextOutgoingRequest() { + return static_cast<Derived*>(this)->blackHoleNextOutgoingRequest(); } BSONObj scheduleSuccessfulKillCursorResponse(size_t cursorId) { - NetworkInterfaceMock::InNetworkGuard ing(getNet()); - - ASSERT(getNet()->hasReadyRequests()); - auto rcr = getNet()->scheduleSuccessfulResponse( - BSON("cursorsKilled" << BSON_ARRAY((long long)(cursorId)) << "cursorsNotFound" - << BSONArray() << "cursorsAlive" << BSONArray() << "cursorsUnknown" - << BSONArray() << "ok" << 1)); - getNet()->runReadyNetworkOperations(); + return static_cast<Derived*>(this)->scheduleSuccessfulKillCursorResponse(cursorId); + } - return rcr.cmdObj.getOwned(); + TaskExecutorCursor makeTec(RemoteCommandRequest rcr, + TaskExecutorCursor::Options&& options = {}) { + return static_cast<Derived*>(this)->makeTec(rcr, std::move(options)); } bool hasReadyRequests() { - NetworkInterfaceMock::InNetworkGuard ing(getNet()); - return getNet()->hasReadyRequests(); + return static_cast<Derived*>(this)->hasReadyRequests(); } - ServiceContext::UniqueServiceContext serviceCtx = ServiceContext::make(); - ServiceContext::UniqueClient client; - ServiceContext::UniqueOperationContext opCtx; -}; + Base& asBase() { + return *this; + } -/** - * Ensure we work for a single simple batch - */ -TEST_F(TaskExecutorCursorFixture, SingleBatchWorks) { - const auto findCmd = BSON("find" - << "test" - << "batchSize" << 2); - const CursorId cursorId = 0; + /** + * Ensure we work for a single simple batch + */ + void SingleBatchWorksTest() { + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); + const CursorId cursorId = 0; - RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); - TaskExecutorCursor tec(&getExecutor(), rcr); + TaskExecutorCursor tec = makeTec(rcr); - ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId)); + ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId)); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); - ASSERT_FALSE(hasReadyRequests()); + ASSERT_FALSE(hasReadyRequests()); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); - ASSERT_FALSE(tec.getNext(opCtx.get())); -} + ASSERT_FALSE(tec.getNext(opCtx.get())); + } -/** - * Ensure the firstBatch can be read correctly when multiple cursors are returned. - */ -TEST_F(TaskExecutorCursorFixture, MultipleCursorsSingleBatchSucceeds) { - const auto aggCmd = BSON("aggregate" + /** + * Ensure the firstBatch can be read correctly when multiple cursors are returned. + */ + void MultipleCursorsSingleBatchSucceedsTest() { + const auto aggCmd = BSON("aggregate" + << "test" + << "pipeline" + << BSON_ARRAY(BSON("returnMultipleCursors" << true))); + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get()); + + TaskExecutorCursor tec = makeTec(rcr); + + ASSERT_BSONOBJ_EQ(aggCmd, + scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, {0, 0})); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); + + ASSERT_FALSE(tec.getNext(opCtx.get())); + + auto cursorVec = tec.releaseAdditionalCursors(); + ASSERT_EQUALS(cursorVec.size(), 1); + auto secondCursor = std::move(cursorVec[0]); + + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 2); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 4); + ASSERT_FALSE(hasReadyRequests()); + + ASSERT_FALSE(secondCursor.getNext(opCtx.get())); + } + /** + * The operation context under which we send the original cursor-establishing command + * can be destructed before getNext is called with new opCtx. Ensure that 'child' + * TaskExecutorCursors created from the original TEC's multi-cursor-response can safely + * operate if this happens/don't try and use the now-destroyed operation context. + * See SERVER-69702 for context + */ + void ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest() { + auto lsid = makeLogicalSessionIdForTest(); + opCtx->setLogicalSessionId(lsid); + const auto aggCmd = BSON("aggregate" + << "test" + << "pipeline" + << BSON_ARRAY(BSON("returnMultipleCursors" << true))); + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get()); + TaskExecutorCursor tec = makeTec(rcr); + auto expected = BSON("aggregate" << "test" - << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true))); + << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true)) + << "lsid" << lsid.toBSON()); + ASSERT_BSONOBJ_EQ(expected, + scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, {0, 0})); + // Before calling getNext (and therefore spawning child TECs), destroy the opCtx + // we used to send the initial query and make a new one. + opCtx.reset(); + opCtx = client->makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + // Use the new opCtx to call getNext. The child TECs should not attempt to read from the + // now dead original opCtx. + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); - RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get()); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); - TaskExecutorCursor tec(&getExecutor(), rcr); + ASSERT_FALSE(tec.getNext(opCtx.get())); - ASSERT_BSONOBJ_EQ(aggCmd, scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, {0, 0})); + auto cursorVec = tec.releaseAdditionalCursors(); + ASSERT_EQUALS(cursorVec.size(), 1); + auto secondCursor = std::move(cursorVec[0]); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 2); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 4); + ASSERT_FALSE(hasReadyRequests()); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); + ASSERT_FALSE(secondCursor.getNext(opCtx.get())); + } - ASSERT_FALSE(tec.getNext(opCtx.get())); + void MultipleCursorsGetMoreWorksTest() { + const auto aggCmd = BSON("aggregate" + << "test" + << "pipeline" + << BSON_ARRAY(BSON("returnMultipleCursors" << true))); + + std::vector<size_t> cursorIds{1, 2}; + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get()); + + TaskExecutorCursor tec = makeTec(rcr); + + ASSERT_BSONOBJ_EQ(aggCmd, + scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, cursorIds)); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); + + auto cursorVec = tec.releaseAdditionalCursors(); + ASSERT_EQUALS(cursorVec.size(), 1); + + // If we try to getNext() at this point, we are interruptible and can timeout + ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100), + ErrorCodes::ExceededTimeLimit, + [&] { tec.getNext(opCtx.get()); }), + DBException, + ErrorCodes::ExceededTimeLimit); + + // We can pick up after that interruption though + ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection" + << "test"), + scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorIds[0])); + + // Repeat for second cursor. + auto secondCursor = std::move(cursorVec[0]); + + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 2); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 4); + + ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100), + ErrorCodes::ExceededTimeLimit, + [&] { secondCursor.getNext(opCtx.get()); }), + DBException, + ErrorCodes::ExceededTimeLimit); + + ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection" + << "test"), + scheduleSuccessfulCursorResponse("nextBatch", 6, 8, cursorIds[1])); + // Read second batch, then schedule EOF on both cursors. + // Then read final document for each. + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 3); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 4); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 5); + scheduleSuccessfulCursorResponse("nextBatch", 6, 6, 0); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 6); + + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 6); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 7); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 8); + scheduleSuccessfulCursorResponse("nextBatch", 12, 12, 0); + ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 12); + + // Shouldn't have any more requests, both cursors are closed. + ASSERT_FALSE(hasReadyRequests()); + + ASSERT_FALSE(tec.getNext(opCtx.get())); + ASSERT_FALSE(secondCursor.getNext(opCtx.get())); + } - auto cursorVec = tec.releaseAdditionalCursors(); - ASSERT_EQUALS(cursorVec.size(), 1); - auto secondCursor = std::move(cursorVec[0]); + /** + * Ensure we work if find fails (and that we receive the error code it failed with) + */ + void FailureInFindTest() { + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); - ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 2); - ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 4); - ASSERT_FALSE(hasReadyRequests()); + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); - ASSERT_FALSE(secondCursor.getNext(opCtx.get())); -} + TaskExecutorCursor tec = makeTec(rcr); -/** - * The operation context under which we send the original cursor-establishing command - * can be destructed before getNext is called with new opCtx. Ensure that 'child' - * TaskExecutorCursors created from the original TEC's multi-cursor-response can safely - * operate if this happens/don't try and use the now-destroyed operation context. - * See SERVER-69702 for context - */ -TEST_F(TaskExecutorCursorFixture, ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) { - auto lsid = makeLogicalSessionIdForTest(); - opCtx->setLogicalSessionId(lsid); - const auto aggCmd = BSON("aggregate" - << "test" - << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true))); - RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get()); - TaskExecutorCursor tec(&getExecutor(), rcr); - auto expected = BSON("aggregate" - << "test" - << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true)) - << "lsid" << lsid.toBSON()); - ASSERT_BSONOBJ_EQ(expected, scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, {0, 0})); - // Before calling getNext (and therefore spawning child TECs), destroy the opCtx - // we used to send the initial query and make a new one. - opCtx.reset(); - opCtx = client->makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - // Use the new opCtx to call getNext. The child TECs should not attempt to read from the - // now dead original opCtx. - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); + scheduleErrorResponse(Status(ErrorCodes::BadValue, "an error")); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); + ASSERT_THROWS_CODE(tec.getNext(opCtx.get()), DBException, ErrorCodes::BadValue); + } - ASSERT_FALSE(tec.getNext(opCtx.get())); - auto cursorVec = tec.releaseAdditionalCursors(); - ASSERT_EQUALS(cursorVec.size(), 1); - auto secondCursor = std::move(cursorVec[0]); + /** + * Ensure multiple batches works correctly + */ + void MultipleBatchesWorksTest() { + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); + CursorId cursorId = 1; - ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 2); - ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 4); - ASSERT_FALSE(hasReadyRequests()); + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); - ASSERT_FALSE(secondCursor.getNext(opCtx.get())); -} + TaskExecutorCursor tec = makeTec(rcr, [] { + TaskExecutorCursor::Options opts; + opts.batchSize = 3; + return opts; + }()); -TEST_F(TaskExecutorCursorFixture, MultipleCursorsGetMoreWorks) { - const auto aggCmd = BSON("aggregate" - << "test" - << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true))); + scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId); - std::vector<size_t> cursorIds{1, 2}; - RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get()); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); - TaskExecutorCursor tec(&getExecutor(), rcr); + // ASSERT(hasReadyRequests()); - ASSERT_BSONOBJ_EQ(aggCmd, scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, cursorIds)); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); + // If we try to getNext() at this point, we are interruptible and can timeout + ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100), + ErrorCodes::ExceededTimeLimit, + [&] { tec.getNext(opCtx.get()); }), + DBException, + ErrorCodes::ExceededTimeLimit); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); + // We can pick up after that interruption though + ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection" + << "test" + << "batchSize" << 3), + scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorId)); - auto cursorVec = tec.releaseAdditionalCursors(); - ASSERT_EQUALS(cursorVec.size(), 1); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 3); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 4); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 5); - // If we try to getNext() at this point, we are interruptible and can timeout - ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100), - ErrorCodes::ExceededTimeLimit, - [&] { tec.getNext(opCtx.get()); }), - DBException, - ErrorCodes::ExceededTimeLimit); + cursorId = 0; + scheduleSuccessfulCursorResponse("nextBatch", 6, 6, cursorId); - // We can pick up after that interruption though - ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection" - << "test"), - scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorIds[0])); - - // Repeat for second cursor. - auto secondCursor = std::move(cursorVec[0]); - - ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 2); - ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 4); - - ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100), - ErrorCodes::ExceededTimeLimit, - [&] { secondCursor.getNext(opCtx.get()); }), - DBException, - ErrorCodes::ExceededTimeLimit); - - ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection" - << "test"), - scheduleSuccessfulCursorResponse("nextBatch", 6, 8, cursorIds[1])); - // Read second batch on both cursors. - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 3); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 4); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 5); - ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 6); - ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 7); - ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 8); - - // Schedule EOF on both cursors. - scheduleSuccessfulCursorResponse("nextBatch", 6, 6, 0); - scheduleSuccessfulCursorResponse("nextBatch", 12, 12, 0); - - // Read final document. - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 6); - ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 12); - - // Shouldn't have any more requests, both cursors are closed. - ASSERT_FALSE(hasReadyRequests()); + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 6); - ASSERT_FALSE(tec.getNext(opCtx.get())); - ASSERT_FALSE(secondCursor.getNext(opCtx.get())); -} + // We don't issue extra getmores after returning a 0 cursor id + ASSERT_FALSE(hasReadyRequests()); -/** - * Ensure we work if find fails (and that we receive the error code it failed with) - */ -TEST_F(TaskExecutorCursorFixture, FailureInFind) { - const auto findCmd = BSON("find" - << "test" - << "batchSize" << 2); + ASSERT_FALSE(tec.getNext(opCtx.get())); + } - RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); + /** + * Ensure we allow empty firstBatch. + */ + void EmptyFirstBatchTest() { + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); + const auto getMoreCmd = BSON("getMore" << 1LL << "collection" + << "test" + << "batchSize" << 3); + const CursorId cursorId = 1; + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); + + TaskExecutorCursor tec = makeTec(rcr, [] { + TaskExecutorCursor::Options opts; + opts.batchSize = 3; + return opts; + }()); + + // Schedule a cursor response with an empty "firstBatch". Use end < start so we don't + // append any doc to "firstBatch". + ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 0, cursorId)); + + stdx::thread th([&] { + // Wait for the getMore run by the getNext() below to be ready, and schedule a + // cursor response with a non-empty "nextBatch". + while (!hasReadyRequests()) { + sleepmillis(10); + } - TaskExecutorCursor tec(&getExecutor(), rcr); + ASSERT_BSONOBJ_EQ(getMoreCmd, scheduleSuccessfulCursorResponse("nextBatch", 1, 1, 0)); + }); - { + // Verify that the first doc is the doc from the second batch. + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); + + th.join(); + } + + /** + * Ensure we allow any empty non-initial batch. + */ + void EmptyNonInitialBatchTest() { + const auto findCmd = BSON("find" + << "test" + << "batchSize" << 2); + const auto getMoreCmd = BSON("getMore" << 1LL << "collection" + << "test" + << "batchSize" << 3); + const CursorId cursorId = 1; + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); + + TaskExecutorCursor tec = makeTec(rcr, [] { + TaskExecutorCursor::Options opts; + opts.batchSize = 3; + return opts; + }()); + + // Schedule a cursor response with a non-empty "firstBatch". + ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 1, cursorId)); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); + + // Schedule two consecutive cursor responses with empty "nextBatch". Use end < start so + // we don't append any doc to "nextBatch". + ASSERT_BSONOBJ_EQ(getMoreCmd, + scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId)); + + stdx::thread th([&] { + // Wait for the first getMore run by the getNext() below to be ready, and schedule a + // cursor response with a non-empty "nextBatch". + while (!hasReadyRequests()) { + sleepmillis(10); + } + + ASSERT_BSONOBJ_EQ(getMoreCmd, + scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId)); + + // Wait for the second getMore run by the getNext() below to be ready, and schedule a + // cursor response with a non-empty "nextBatch". + while (!hasReadyRequests()) { + sleepmillis(10); + } + + ASSERT_BSONOBJ_EQ(getMoreCmd, scheduleSuccessfulCursorResponse("nextBatch", 2, 2, 0)); + }); + + // Verify that the next doc is the doc from the fourth batch. + ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); + + th.join(); + } + + ServiceContext::UniqueServiceContext serviceCtx = ServiceContext::make(); + ServiceContext::UniqueClient client; + ServiceContext::UniqueOperationContext opCtx; +}; + +class NonPinningTaskExecutorCursorTestFixture + : public TaskExecutorCursorTestFixture<NonPinningTaskExecutorCursorTestFixture, + ThreadPoolExecutorTest> { +public: + void postSetUp() { + launchExecutorThread(); + } + + BSONObj scheduleSuccessfulCursorResponse(StringData fieldName, + size_t start, + size_t end, + size_t cursorId) { NetworkInterfaceMock::InNetworkGuard ing(getNet()); + ASSERT(getNet()->hasReadyRequests()); - getNet()->scheduleErrorResponse(Status(ErrorCodes::BadValue, "an error")); + auto rcr = getNet()->scheduleSuccessfulResponse( + buildCursorResponse(fieldName, start, end, cursorId)); getNet()->runReadyNetworkOperations(); + + return rcr.cmdObj.getOwned(); } - ASSERT_THROWS_CODE(tec.getNext(opCtx.get()), DBException, ErrorCodes::BadValue); -} + BSONObj scheduleSuccessfulMultiCursorResponse(StringData fieldName, + size_t start, + size_t end, + std::vector<size_t> cursorIds) { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); -/** - * Ensure early termination of the cursor calls killCursor (if we know about the cursor id) - */ -TEST_F(TaskExecutorCursorFixture, EarlyReturnKillsCursor) { - const auto findCmd = BSON("find" - << "test" - << "batchSize" << 2); - const CursorId cursorId = 1; - RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); + ASSERT(getNet()->hasReadyRequests()); + auto rcr = getNet()->scheduleSuccessfulResponse( + buildMultiCursorResponse(fieldName, start, end, cursorIds)); + getNet()->runReadyNetworkOperations(); - { - TaskExecutorCursor tec(&getExecutor(), rcr); + return rcr.cmdObj.getOwned(); + } - scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId); + BSONObj scheduleSuccessfulKillCursorResponse(size_t cursorId) { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); - ASSERT(tec.getNext(opCtx.get())); - } + ASSERT(getNet()->hasReadyRequests()); + auto rcr = getNet()->scheduleSuccessfulResponse( + BSON("cursorsKilled" << BSON_ARRAY((long long)(cursorId)) << "cursorsNotFound" + << BSONArray() << "cursorsAlive" << BSONArray() << "cursorsUnknown" + << BSONArray() << "ok" << 1)); + getNet()->runReadyNetworkOperations(); - // Black hole the pending `getMore` operation scheduled by the `TaskExecutorCursor`. - { - NetworkInterfaceMock::InNetworkGuard guard(getNet()); - getNet()->blackHole(getNet()->getFrontOfUnscheduledQueue()); + return rcr.cmdObj.getOwned(); } - ASSERT_BSONOBJ_EQ(BSON("killCursors" - << "test" - << "cursors" << BSON_ARRAY(1)), - scheduleSuccessfulKillCursorResponse(1)); -} + void scheduleErrorResponse(Status error) { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); -/** - * Ensure multiple batches works correctly - */ -TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) { - const auto findCmd = BSON("find" - << "test" - << "batchSize" << 2); - CursorId cursorId = 1; + ASSERT(getNet()->hasReadyRequests()); + getNet()->scheduleErrorResponse(error); + getNet()->runReadyNetworkOperations(); + } - RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); + bool hasReadyRequests() { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + return getNet()->hasReadyRequests(); + } - TaskExecutorCursor tec(&getExecutor(), rcr, [] { - TaskExecutorCursor::Options opts; - opts.batchSize = 3; - return opts; - }()); + void blackHoleNextOutgoingRequest() { + NetworkInterfaceMock::InNetworkGuard guard(getNet()); + getNet()->blackHole(getNet()->getFrontOfUnscheduledQueue()); + } - scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId); + TaskExecutorCursor makeTec(RemoteCommandRequest rcr, + TaskExecutorCursor::Options&& options = {}) { + return TaskExecutorCursor(getExecutorPtr(), rcr, std::move(options)); + } +}; - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); +class PinnedConnTaskExecutorCursorTestFixture + : public TaskExecutorCursorTestFixture<PinnedConnTaskExecutorCursorTestFixture, + PinnedConnectionTaskExecutorTest> { +public: + void postSetUp() {} + + BSONObj scheduleResponse(StatusWith<BSONObj> response) { + int32_t responseToId; + BSONObj cmdObjReceived; + auto pf = makePromiseFuture<void>(); + expectSinkMessage([&](Message m) { + responseToId = m.header().getId(); + auto opMsg = OpMsgRequest::parse(m); + cmdObjReceived = opMsg.body.removeField("$db").getOwned(); + pf.promise.emplaceValue(); + return Status::OK(); + }); + // Wait until we recieved the command request. + pf.future.get(); + + // Now we expect source message to be called and provide the response + expectSourceMessage([=]() { + rpc::OpMsgReplyBuilder replyBuilder; + replyBuilder.setCommandReply(response); + auto message = replyBuilder.done(); + message.header().setResponseToMsgId(responseToId); + return message; + }); + return cmdObjReceived; + } - ASSERT(hasReadyRequests()); + BSONObj scheduleSuccessfulCursorResponse(StringData fieldName, + size_t start, + size_t end, + size_t cursorId) { + auto cursorResponse = buildCursorResponse(fieldName, start, end, cursorId); + return scheduleResponse(cursorResponse); + } - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); + BSONObj scheduleSuccessfulMultiCursorResponse(StringData fieldName, + size_t start, + size_t end, + std::vector<size_t> cursorIds) { + auto cursorResponse = buildMultiCursorResponse(fieldName, start, end, cursorIds); + return scheduleResponse(cursorResponse); + } - // If we try to getNext() at this point, we are interruptible and can timeout - ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100), - ErrorCodes::ExceededTimeLimit, - [&] { tec.getNext(opCtx.get()); }), - DBException, - ErrorCodes::ExceededTimeLimit); + void scheduleErrorResponse(Status error) { + scheduleResponse(error); + } - // We can pick up after that interruption though - ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection" - << "test" - << "batchSize" << 3), - scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorId)); + BSONObj scheduleSuccessfulKillCursorResponse(size_t cursorId) { - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 3); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 4); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 5); + auto cursorResponse = + BSON("cursorsKilled" << BSON_ARRAY((long long)(cursorId)) << "cursorsNotFound" + << BSONArray() << "cursorsAlive" << BSONArray() << "cursorsUnknown" + << BSONArray() << "ok" << 1); + return scheduleResponse(cursorResponse); + } - cursorId = 0; - scheduleSuccessfulCursorResponse("nextBatch", 6, 6, cursorId); + TaskExecutorCursor makeTec(RemoteCommandRequest rcr, + TaskExecutorCursor::Options&& options = {}) { + options.pinConnection = true; + return TaskExecutorCursor(getExecutorPtr(), rcr, std::move(options)); + } - // We don't issue extra getmores after returning a 0 cursor id - ASSERT_FALSE(hasReadyRequests()); + bool hasReadyRequests() { + return asBase().hasReadyRequests(); + } - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 6); + void blackHoleNextOutgoingRequest() { + auto pf = makePromiseFuture<void>(); + expectSinkMessage([&](Message m) { + pf.promise.emplaceValue(); + return Status(ErrorCodes::SocketException, "test"); + }); + pf.future.get(); + } +}; - ASSERT_FALSE(tec.getNext(opCtx.get())); +TEST_F(NonPinningTaskExecutorCursorTestFixture, SingleBatchWorks) { + SingleBatchWorksTest(); } -/** - * Ensure we allow empty firstBatch. - */ -TEST_F(TaskExecutorCursorFixture, EmptyFirstBatch) { - const auto findCmd = BSON("find" - << "test" - << "batchSize" << 2); - const auto getMoreCmd = BSON("getMore" << 1LL << "collection" - << "test" - << "batchSize" << 3); - const CursorId cursorId = 1; +TEST_F(PinnedConnTaskExecutorCursorTestFixture, SingleBatchWorks) { + SingleBatchWorksTest(); +} - RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); +TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSucceeds) { + MultipleCursorsSingleBatchSucceedsTest(); +} - TaskExecutorCursor tec(&getExecutor(), rcr, [] { - TaskExecutorCursor::Options opts; - opts.batchSize = 3; - return opts; - }()); +TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSucceeds) { + MultipleCursorsSingleBatchSucceedsTest(); +} - // Schedule a cursor response with an empty "firstBatch". Use end < start so we don't - // append any doc to "firstBatch". - ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 0, cursorId)); +TEST_F(NonPinningTaskExecutorCursorTestFixture, + ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) { + ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest(); +} - stdx::thread th([&] { - // Wait for the getMore run by the getNext() below to be ready, and schedule a - // cursor response with a non-empty "nextBatch". - while (!hasReadyRequests()) { - sleepmillis(10); - } +TEST_F(PinnedConnTaskExecutorCursorTestFixture, + ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) { + ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest(); +} +TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) { + MultipleCursorsGetMoreWorksTest(); +} - ASSERT_BSONOBJ_EQ(getMoreCmd, - scheduleSuccessfulCursorResponse("nextBatch", 1, 1, cursorId)); - }); +TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) { + MultipleCursorsGetMoreWorksTest(); +} - // Verify that the first doc is the doc from the second batch. - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); +TEST_F(NonPinningTaskExecutorCursorTestFixture, FailureInFind) { + FailureInFindTest(); +} - th.join(); +TEST_F(PinnedConnTaskExecutorCursorTestFixture, FailureInFind) { + FailureInFindTest(); } /** - * Ensure we allow any empty non-initial batch. + * Ensure early termination of the cursor calls killCursor (if we know about the cursor id) + * Only applicapble to the unpinned case - if the connection is pinned, and a getMore is + * in progress and/or fails, the most we can do is kill the connection. We can't re-use + * the connection to send killCursors. */ -TEST_F(TaskExecutorCursorFixture, EmptyNonInitialBatch) { +TEST_F(NonPinningTaskExecutorCursorTestFixture, EarlyReturnKillsCursor) { const auto findCmd = BSON("find" << "test" << "batchSize" << 2); - const auto getMoreCmd = BSON("getMore" << 1LL << "collection" - << "test" - << "batchSize" << 3); const CursorId cursorId = 1; RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); - TaskExecutorCursor tec(&getExecutor(), rcr, [] { - TaskExecutorCursor::Options opts; - opts.batchSize = 3; - return opts; - }()); + { + TaskExecutorCursor tec = makeTec(rcr); - // Schedule a cursor response with a non-empty "firstBatch". - ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 1, cursorId)); + scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId); - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1); + ASSERT(tec.getNext(opCtx.get())); - // Schedule two consecutive cursor responses with empty "nextBatch". Use end < start so - // we don't append any doc to "nextBatch". - ASSERT_BSONOBJ_EQ(getMoreCmd, scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId)); + // Black hole the pending `getMore` operation scheduled by the `TaskExecutorCursor`. + blackHoleNextOutgoingRequest(); + } - stdx::thread th([&] { - // Wait for the first getMore run by the getNext() below to be ready, and schedule a - // cursor response with a non-empty "nextBatch". - while (!hasReadyRequests()) { - sleepmillis(10); - } - ASSERT_BSONOBJ_EQ(getMoreCmd, - scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId)); + ASSERT_BSONOBJ_EQ(BSON("killCursors" + << "test" + << "cursors" << BSON_ARRAY(1)), + scheduleSuccessfulKillCursorResponse(1)); +} - // Wait for the second getMore run by the getNext() below to be ready, and schedule a - // cursor response with a non-empty "nextBatch". - while (!hasReadyRequests()) { - sleepmillis(10); - } +TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleBatchesWorks) { + MultipleBatchesWorksTest(); +} - ASSERT_BSONOBJ_EQ(getMoreCmd, - scheduleSuccessfulCursorResponse("nextBatch", 2, 2, cursorId)); - }); +TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleBatchesWorks) { + MultipleBatchesWorksTest(); +} + +TEST_F(NonPinningTaskExecutorCursorTestFixture, EmptyFirstBatch) { + EmptyFirstBatchTest(); +} + +TEST_F(PinnedConnTaskExecutorCursorTestFixture, EmptyFirstBatch) { + EmptyFirstBatchTest(); +} - // Verify that the next doc is the doc from the fourth batch. - ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2); +TEST_F(NonPinningTaskExecutorCursorTestFixture, EmptyNonInitialBatch) { + EmptyNonInitialBatchTest(); +} - th.join(); +TEST_F(PinnedConnTaskExecutorCursorTestFixture, EmptyNonInitialBatch) { + EmptyNonInitialBatchTest(); } /** - * Ensure lsid is passed in all stages of querying + * Ensure the LSID is passed in all stages of querying. Need to test the + * pinning case separately because of difference around killCursor. */ -TEST_F(TaskExecutorCursorFixture, LsidIsPassed) { +TEST_F(NonPinningTaskExecutorCursorTestFixture, LsidIsPassed) { auto lsid = makeLogicalSessionIdForTest(); opCtx->setLogicalSessionId(lsid); @@ -538,11 +765,11 @@ TEST_F(TaskExecutorCursorFixture, LsidIsPassed) { RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); boost::optional<TaskExecutorCursor> tec; - tec.emplace(&getExecutor(), rcr, []() { + tec.emplace(makeTec(rcr, []() { TaskExecutorCursor::Options opts; opts.batchSize = 1; return opts; - }()); + }())); // lsid in the first batch ASSERT_BSONOBJ_EQ(BSON("find" diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 9c95467118f..900ed7508c8 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -231,6 +231,9 @@ private: // Lifecycle state of this executor. stdx::condition_variable _stateChange; State _state = preStart; + + friend std::shared_ptr<TaskExecutor> makePinnedConnectionTaskExecutor( + std::shared_ptr<TaskExecutor>); }; } // namespace executor |