summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2023-04-05 13:50:46 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-17 18:36:47 +0000
commitf97fe6b211a61396c1db7c9210e2655c14671a9d (patch)
tree644f54d3d19c0773e2a16e65ee0e3396b2fea5eb
parente3fb6a12415cb87efbc4afb1e5dfff226fd808c1 (diff)
downloadmongo-f97fe6b211a61396c1db7c9210e2655c14671a9d.tar.gz
SERVER-73613 Let TaskExecutorCursor Pin Connections When Requested
(cherry picked from commit db68d8f57be484d25ba4c37bd049e0c07cb7d645)
-rw-r--r--src/mongo/executor/SConscript19
-rw-r--r--src/mongo/executor/pinned_connection_task_executor_factory.cpp54
-rw-r--r--src/mongo/executor/pinned_connection_task_executor_factory.h56
-rw-r--r--src/mongo/executor/task_executor_cursor.cpp93
-rw-r--r--src/mongo/executor/task_executor_cursor.h15
-rw-r--r--src/mongo/executor/task_executor_cursor_integration_test.cpp87
-rw-r--r--src/mongo/executor/task_executor_cursor_test.cpp899
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h3
8 files changed, 882 insertions, 344 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index bf2c8cd943b..5e2c76d94fd 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -287,6 +287,9 @@ env.Library(
'$BUILD_DIR/mongo/db/query/command_request_response',
'task_executor_interface',
],
+ LIBDEPS_PRIVATE=[
+ 'pinned_connection_task_executor_factory',
+ ],
)
env.Library(
@@ -302,6 +305,21 @@ env.Library(
],
)
+env.Library(
+ target='pinned_connection_task_executor_factory',
+ source=[
+ 'pinned_connection_task_executor_factory.cpp',
+ ],
+ LIBDEPS=[
+ 'task_executor_interface',
+ ],
+ LIBDEPS_PRIVATE=[
+ 'network_interface',
+ 'pinned_connection_task_executor',
+ 'thread_pool_task_executor',
+ ],
+)
+
env.CppUnitTest(
target='executor_test',
source=[
@@ -342,6 +360,7 @@ env.CppIntegrationTest(
'$BUILD_DIR/mongo/db/wire_version',
'$BUILD_DIR/mongo/executor/network_interface_factory',
'$BUILD_DIR/mongo/executor/network_interface_thread_pool',
+ '$BUILD_DIR/mongo/executor/pinned_connection_task_executor',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
'$BUILD_DIR/mongo/transport/transport_layer_egress_init',
'$BUILD_DIR/mongo/util/concurrency/thread_pool',
diff --git a/src/mongo/executor/pinned_connection_task_executor_factory.cpp b/src/mongo/executor/pinned_connection_task_executor_factory.cpp
new file mode 100644
index 00000000000..e5871adbaa0
--- /dev/null
+++ b/src/mongo/executor/pinned_connection_task_executor_factory.cpp
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <memory>
+
+#include "mongo/executor/pinned_connection_task_executor_factory.h"
+
+#include "mongo/executor/pinned_connection_task_executor.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+
+namespace mongo {
+namespace executor {
+
+std::shared_ptr<TaskExecutor> makePinnedConnectionTaskExecutor(std::shared_ptr<TaskExecutor> exec,
+ NetworkInterface* net) {
+ return std::make_shared<PinnedConnectionTaskExecutor>(std::move(exec), net);
+}
+
+std::shared_ptr<TaskExecutor> makePinnedConnectionTaskExecutor(std::shared_ptr<TaskExecutor> exec) {
+ auto tpte = dynamic_cast<ThreadPoolTaskExecutor*>(exec.get());
+ invariant(tpte,
+ "Connection-pinning task executors can only be constructed from "
+ "ThreadPoolTaskExecutor unless an explicit NetworkInterface is provided.");
+ return makePinnedConnectionTaskExecutor(std::move(exec), tpte->_net.get());
+}
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/pinned_connection_task_executor_factory.h b/src/mongo/executor/pinned_connection_task_executor_factory.h
new file mode 100644
index 00000000000..dbf79481bff
--- /dev/null
+++ b/src/mongo/executor/pinned_connection_task_executor_factory.h
@@ -0,0 +1,56 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "mongo/executor/network_interface.h"
+#include "mongo/executor/task_executor.h"
+
+namespace mongo {
+namespace executor {
+
+/**
+ * Returns a new TaskExecutor that does all of its RPC execution over the same transport session.
+ * The returned executor uses `exec`'s execution resources and acquires the transport session from
+ * `net`.
+ */
+std::shared_ptr<TaskExecutor> makePinnedConnectionTaskExecutor(std::shared_ptr<TaskExecutor> exec,
+ NetworkInterface* net);
+
+/**
+ * Returns a new TaskExecutor that does all of its RPC execution over the same transport session.
+ * The provided executor _must_ be a ThreadPoolTaskExecutor, and its underlying execution and
+ * network resources will be used by the returned executor.
+ */
+std::shared_ptr<TaskExecutor> makePinnedConnectionTaskExecutor(std::shared_ptr<TaskExecutor> exec);
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp
index c8011fe9a2b..957dcd8f563 100644
--- a/src/mongo/executor/task_executor_cursor.cpp
+++ b/src/mongo/executor/task_executor_cursor.cpp
@@ -36,30 +36,45 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/query/getmore_command_gen.h"
#include "mongo/db/query/kill_cursors_gen.h"
+#include "mongo/executor/pinned_connection_task_executor_factory.h"
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/time_support.h"
namespace mongo {
namespace executor {
+namespace {
+MONGO_FAIL_POINT_DEFINE(blockBeforePinnedExecutorIsDestroyedOnUnderlying);
+} // namespace
-TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor,
+TaskExecutorCursor::TaskExecutorCursor(std::shared_ptr<executor::TaskExecutor> executor,
const RemoteCommandRequest& rcr,
Options&& options)
- : _executor(executor), _rcr(rcr), _options(std::move(options)), _batchIter(_batch.end()) {
+ : _rcr(rcr), _options(std::move(options)), _batchIter(_batch.end()) {
if (rcr.opCtx) {
_lsid = rcr.opCtx->getLogicalSessionId();
}
+ if (_options.pinConnection) {
+ _executor = makePinnedConnectionTaskExecutor(executor);
+ _underlyingExecutor = std::move(executor);
+ } else {
+ _executor = std::move(executor);
+ }
_runRemoteCommand(_createRequest(_rcr.opCtx, _rcr.cmdObj));
}
-TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor,
+TaskExecutorCursor::TaskExecutorCursor(std::shared_ptr<executor::TaskExecutor> executor,
+ std::shared_ptr<executor::TaskExecutor> underlyingExec,
CursorResponse&& response,
RemoteCommandRequest& rcr,
Options&& options)
- : _executor(executor), _rcr(rcr), _options(std::move(options)), _batchIter(_batch.end()) {
+ : _executor(std::move(executor)),
+ _underlyingExecutor(std::move(underlyingExec)),
+ _rcr(rcr),
+ _options(std::move(options)),
+ _batchIter(_batch.end()) {
tassert(6253101, "rcr must have an opCtx to use construct cursor from response", rcr.opCtx);
_lsid = rcr.opCtx->getLogicalSessionId();
@@ -67,7 +82,8 @@ TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor,
}
TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other)
- : _executor(other._executor),
+ : _executor(std::move(other._executor)),
+ _underlyingExecutor(std::move(other._underlyingExecutor)),
_rcr(other._rcr),
_options(std::move(other._options)),
_lsid(other._lsid),
@@ -98,15 +114,19 @@ TaskExecutorCursor::TaskExecutorCursor(TaskExecutorCursor&& other)
TaskExecutorCursor::~TaskExecutorCursor() {
try {
- if (_cursorId < kMinLegalCursorId) {
+ if (_cursorId < kMinLegalCursorId || _options.pinConnection) {
// The initial find to establish the cursor has to be canceled to avoid leaking cursors.
// Once the cursor is established, killing the cursor will interrupt any ongoing
// `getMore` operation.
+ // Additionally, in pinned mode, we should cancel any in-progress RPC if there is one,
+ // even at the cost of churning the connection, because it's the only way to interrupt
+ // the ongoing operation.
if (_cmdState) {
_executor->cancel(_cmdState->cbHandle);
}
-
- return;
+ if (_cursorId < kMinLegalCursorId) {
+ return;
+ }
}
// We deliberately ignore failures to kill the cursor. This "best effort" is acceptable
@@ -115,15 +135,42 @@ TaskExecutorCursor::~TaskExecutorCursor() {
// That timeout mechanism could be the default cursor timeout, or the logical session
// timeout if an lsid is used.
//
- // Killing the cursor also interrupts any ongoing getMore operations on this cursor. Avoid
- // canceling the remote command through its callback handle as that may close the underlying
- // connection.
- _executor
- ->scheduleRemoteCommand(
- _createRequest(nullptr,
- KillCursorsCommandRequest(_ns, {_cursorId}).toBSON(BSONObj{})),
- [](const auto&) {})
- .isOK();
+ // In non-pinned mode, killing the cursor also interrupts any ongoing getMore operations on
+ // this cursor. Avoid canceling the remote command through its callback handle as that may
+ // close the underlying connection.
+ //
+ // In pinned mode, we must await completion of the killCursors to safely reuse the pinned
+ // connection. This requires allocating an executor thread (from `_underlyingExecutor`) upon
+ // completion of the killCursors command to shutdown and destroy the pinned executor. This
+ // is necessary as joining an executor from its own threads results in a deadlock.
+ TaskExecutor::RemoteCommandCallbackFn callbackToRun = [](const auto&) {};
+ if (_options.pinConnection) {
+ invariant(_underlyingExecutor,
+ "TaskExecutorCursor in pinning mode must have an underlying executor");
+ callbackToRun = [main = _executor, underlying = _underlyingExecutor](const auto&) {
+ underlying->schedule([main = std::move(main)](const auto&) {
+ if (MONGO_unlikely(
+ blockBeforePinnedExecutorIsDestroyedOnUnderlying.shouldFail())) {
+ LOGV2(7361300,
+ "Hanging before destroying a TaskExecutorCursor's pinning executor.");
+ blockBeforePinnedExecutorIsDestroyedOnUnderlying.pauseWhileSet();
+ }
+ // Returning from this callback will destroy the pinned executor on
+ // underlying if this is the last TaskExecutorCursor using that pinned executor.
+ });
+ };
+ }
+ auto swCallback = _executor->scheduleRemoteCommand(
+ _createRequest(nullptr, KillCursorsCommandRequest(_ns, {_cursorId}).toBSON(BSONObj{})),
+ callbackToRun);
+
+ // It's possible the executor is already shutdown and rejects work. If so, run the callback
+ // inline.
+ if (!swCallback.isOK()) {
+ TaskExecutor::RemoteCommandCallbackArgs args(
+ _executor.get(), {}, {}, swCallback.getStatus());
+ callbackToRun(args);
+ }
} catch (const DBException& ex) {
LOGV2(6531704,
"Encountered an error while destroying a cursor executor",
@@ -245,11 +292,19 @@ void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) {
_processResponse(opCtx, std::move(cr));
// If we have more responses, build them into cursors then hold them until a caller accesses
// them. Skip the first response, we used it to populate this cursor.
+ // Ensure we update the RCR we give to each 'child cursor' with the current opCtx.
+ auto freshRcr = _createRequest(opCtx, _rcr.cmdObj);
+ auto copyOptions = [&] {
+ TaskExecutorCursor::Options options;
+ options.pinConnection = _options.pinConnection;
+ return options;
+ };
for (unsigned int i = 1; i < cursorResponses.size(); ++i) {
_additionalCursors.emplace_back(_executor,
+ _underlyingExecutor,
uassertStatusOK(std::move(cursorResponses[i])),
- _rcr,
- TaskExecutorCursor::Options());
+ freshRcr,
+ copyOptions());
}
}
diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h
index 728c1441880..5ecf48820d9 100644
--- a/src/mongo/executor/task_executor_cursor.h
+++ b/src/mongo/executor/task_executor_cursor.h
@@ -68,6 +68,8 @@ public:
struct Options {
boost::optional<int64_t> batchSize;
+ bool pinConnection{false};
+ Options() {}
};
/**
@@ -78,7 +80,7 @@ public:
* opCtx - The Logical Session Id from the initial command is carried over in all later stages.
* NOTE - the actual command must not include the lsid
*/
- explicit TaskExecutorCursor(executor::TaskExecutor* executor,
+ explicit TaskExecutorCursor(std::shared_ptr<executor::TaskExecutor> executor,
const RemoteCommandRequest& rcr,
Options&& options = {});
@@ -87,8 +89,11 @@ public:
* The executor is used for subsequent getMore calls. Uses the original RemoteCommandRequest
* to build subsequent commands. Takes ownership of the CursorResponse and gives it to the new
* cursor.
+ * If the cursor should reuse the original transport connection that opened the original
+ * cursor, make sure the pinning executor that was used to open that cursor is provided.
*/
- TaskExecutorCursor(executor::TaskExecutor* executor,
+ TaskExecutorCursor(std::shared_ptr<executor::TaskExecutor> executor,
+ std::shared_ptr<executor::TaskExecutor> underlyingExec,
CursorResponse&& response,
RemoteCommandRequest& rcr,
Options&& options = {});
@@ -191,7 +196,11 @@ private:
*/
const RemoteCommandRequest& _createRequest(OperationContext* opCtx, const BSONObj& cmd);
- executor::TaskExecutor* const _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
+ // If we are pinning connections, we need to keep a separate reference to the
+ // non-pinning, normal executor, so that we can shut down the pinned executor
+ // out-of-line.
+ std::shared_ptr<executor::TaskExecutor> _underlyingExecutor;
// Used as a scratch pad for the successive scheduleRemoteCommand calls
RemoteCommandRequest _rcr;
diff --git a/src/mongo/executor/task_executor_cursor_integration_test.cpp b/src/mongo/executor/task_executor_cursor_integration_test.cpp
index 764a9401bc8..8a8433afd72 100644
--- a/src/mongo/executor/task_executor_cursor_integration_test.cpp
+++ b/src/mongo/executor/task_executor_cursor_integration_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
+#include "mongo/executor/pinned_connection_task_executor.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/unittest/integration_test.h"
@@ -55,20 +56,25 @@ public:
}
void setUp() override {
- std::shared_ptr<NetworkInterface> ni = makeNetworkInterface("TaskExecutorCursorTest");
- auto tp = std::make_unique<NetworkInterfaceThreadPool>(ni.get());
+ _ni = makeNetworkInterface("TaskExecutorCursorTest");
+ auto tp = std::make_unique<NetworkInterfaceThreadPool>(_ni.get());
- _executor = std::make_unique<ThreadPoolTaskExecutor>(std::move(tp), std::move(ni));
+ _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(tp), _ni);
_executor->startup();
};
void tearDown() override {
_executor->shutdown();
+ _executor->join();
_executor.reset();
};
- TaskExecutor* executor() {
- return _executor.get();
+ std::shared_ptr<TaskExecutor> executor() {
+ return _executor;
+ }
+
+ auto net() {
+ return _ni.get();
}
auto makeOpCtx() {
@@ -93,7 +99,8 @@ public:
private:
ServiceContext::UniqueServiceContext _serviceCtx = ServiceContext::make();
- std::unique_ptr<ThreadPoolTaskExecutor> _executor;
+ std::shared_ptr<ThreadPoolTaskExecutor> _executor;
+ std::shared_ptr<NetworkInterface> _ni;
ServiceContext::UniqueClient _client = _serviceCtx->makeClient("TaskExecutorCursorTest");
};
@@ -112,7 +119,7 @@ size_t createTestData(std::string ns, size_t numDocs) {
return dbclient->count(NamespaceString(ns));
}
-// Test that we can actually use a TaskExecutorCursor to read multiple batches from a remote host
+// Test that we can actually use a TaskExecutorCursor to read multiple batches from a remote host.
TEST_F(TaskExecutorCursorFixture, Basic) {
const size_t numDocs = 100;
ASSERT_EQ(createTestData("test.test", numDocs), numDocs);
@@ -139,6 +146,70 @@ TEST_F(TaskExecutorCursorFixture, Basic) {
ASSERT_EQUALS(count, numDocs);
}
+// Test that we can actually use a TaskExecutorCursor that pins it's connection to read multiple
+// batches from a remote host.
+TEST_F(TaskExecutorCursorFixture, BasicPinned) {
+ const size_t numDocs = 100;
+ ASSERT_EQ(createTestData("test.test", numDocs), numDocs);
+
+ auto opCtx = makeOpCtx();
+ RemoteCommandRequest rcr(unittest::getFixtureConnectionString().getServers().front(),
+ "test",
+ BSON("find"
+ << "test"
+ << "batchSize" << 10),
+ opCtx.get());
+
+ TaskExecutorCursor tec(executor(), rcr, [this] {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 10;
+ opts.pinConnection = true;
+ return opts;
+ }());
+
+ size_t count = 0;
+ while (auto doc = tec.getNext(opCtx.get())) {
+ count++;
+ }
+
+ ASSERT_EQUALS(count, numDocs);
+}
+
+// Test that when a TaskExecutorCursor is used in pinning-mode, the pinned executor's destruction
+// is scheduled on the underlying executor.
+TEST_F(TaskExecutorCursorFixture, PinnedExecutorDestroyedOnUnderlying) {
+ const size_t numDocs = 100;
+ ASSERT_EQ(createTestData("test.test", numDocs), numDocs);
+
+ auto opCtx = makeOpCtx();
+ RemoteCommandRequest rcr(unittest::getFixtureConnectionString().getServers().front(),
+ "test",
+ BSON("find"
+ << "test"
+ << "batchSize" << 10),
+ opCtx.get());
+
+ boost::optional<TaskExecutorCursor> tec;
+ tec.emplace(executor(), rcr, [] {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 10;
+ opts.pinConnection = true;
+ return opts;
+ }());
+ // Fetch a documents to make sure the TEC was initialized properly.
+ ASSERT(tec->getNext(opCtx.get()));
+ // Enable the failpoint in the integration test process.
+ {
+ FailPointEnableBlock fpb("blockBeforePinnedExecutorIsDestroyedOnUnderlying");
+ auto initialTimesEntered = fpb.initialTimesEntered();
+ // Destroy the TEC and ensure we reach the code block that will destroy the pinned executor.
+ tec.reset();
+ LOGV2(7361301, "Waiting for TaskExecutorCursor to destroy its pinning executor.");
+ fpb->waitForTimesEntered(initialTimesEntered + 1);
+ }
+ // Allow the pinned executor's destruction to proceed.
+}
+
/**
* Verifies that the underlying connection used to run `getMore` commands remains open, even after
* the instance of `TaskExecutorCursor` is destroyed.
@@ -238,7 +309,7 @@ TEST_F(TaskExecutorCursorFixture, ConnectionRemainsOpenAfterKillingTheCursor) {
const auto afterStats = getConnectionStatsForTarget();
auto countOpenConns = [](const ConnectionStatsPer& stats) {
- return stats.inUse + stats.available + stats.refreshing;
+ return stats.inUse + stats.available + stats.refreshing + stats.leased;
};
// Verify that no connection is created or closed.
diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp
index 563ca1f9147..011ba9d8391 100644
--- a/src/mongo/executor/task_executor_cursor_test.cpp
+++ b/src/mongo/executor/task_executor_cursor_test.cpp
@@ -32,8 +32,11 @@
#include "mongo/platform/basic.h"
#include "mongo/db/concurrency/locker_noop_client_observer.h"
+#include "mongo/executor/pinned_connection_task_executor.h"
+#include "mongo/executor/pinned_connection_task_executor_test_fixture.h"
#include "mongo/executor/task_executor_cursor.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/unittest/bson_test_util.h"
#include "mongo/unittest/unittest.h"
@@ -41,450 +44,718 @@ namespace mongo {
namespace executor {
namespace {
+BSONObj buildCursorResponse(StringData fieldName, size_t start, size_t end, size_t cursorId) {
+ BSONObjBuilder bob;
+ {
+ BSONObjBuilder cursor(bob.subobjStart("cursor"));
+ {
+ BSONArrayBuilder batch(cursor.subarrayStart(fieldName));
+
+ for (size_t i = start; i <= end; ++i) {
+ BSONObjBuilder doc(batch.subobjStart());
+ doc.append("x", int(i));
+ }
+ }
+ cursor.append("id", (long long)(cursorId));
+ cursor.append("ns", "test.test");
+ }
+ bob.append("ok", int(1));
+ return bob.obj();
+}
+
+BSONObj buildMultiCursorResponse(StringData fieldName,
+ size_t start,
+ size_t end,
+ std::vector<size_t> cursorIds) {
+ BSONObjBuilder bob;
+ {
+ BSONArrayBuilder cursors;
+ int baseCursorValue = 1;
+ for (auto cursorId : cursorIds) {
+ BSONObjBuilder cursor;
+ BSONArrayBuilder batch;
+ ASSERT(start < end && end < INT_MAX);
+ for (size_t i = start; i <= end; ++i) {
+ batch.append(BSON("x" << static_cast<int>(i) * baseCursorValue).getOwned());
+ }
+ cursor.append(fieldName, batch.arr());
+ cursor.append("id", (long long)(cursorId));
+ cursor.append("ns", "test.test");
+ auto cursorObj = BSON("cursor" << cursor.done() << "ok" << 1);
+ cursors.append(cursorObj.getOwned());
+ ++baseCursorValue;
+ }
+ bob.append("cursors", cursors.arr());
+ }
+ bob.append("ok", 1);
+ return bob.obj();
+}
+
/**
* Fixture for the task executor cursor tests which offers some convenience methods to help with
- * scheduling responses
+ * scheduling responses. Uses the CRTP pattern so that the tests can be shared between child-classes
+ * that provide their own implementations of the network-mocking needed for the tests.
*/
-class TaskExecutorCursorFixture : public ThreadPoolExecutorTest {
+template <typename Derived, typename Base>
+class TaskExecutorCursorTestFixture : public Base {
public:
- TaskExecutorCursorFixture() {
+ TaskExecutorCursorTestFixture() {
serviceCtx->registerClientObserver(std::make_unique<LockerNoopClientObserver>());
}
void setUp() override {
- ThreadPoolExecutorTest::setUp();
-
+ Base::setUp();
client = serviceCtx->makeClient("TaskExecutorCursorTest");
opCtx = client->makeOperationContext();
-
- launchExecutorThread();
+ static_cast<Derived*>(this)->postSetUp();
}
void tearDown() override {
opCtx.reset();
client.reset();
- ThreadPoolExecutorTest::tearDown();
+ Base::tearDown();
}
BSONObj scheduleSuccessfulCursorResponse(StringData fieldName,
size_t start,
size_t end,
size_t cursorId) {
- NetworkInterfaceMock::InNetworkGuard ing(getNet());
-
- BSONObjBuilder bob;
- {
- BSONObjBuilder cursor(bob.subobjStart("cursor"));
- {
- BSONArrayBuilder batch(cursor.subarrayStart(fieldName));
-
- for (size_t i = start; i <= end; ++i) {
- BSONObjBuilder doc(batch.subobjStart());
- doc.append("x", int(i));
- }
- }
- cursor.append("id", (long long)(cursorId));
- cursor.append("ns", "test.test");
- }
- bob.append("ok", int(1));
-
- ASSERT(getNet()->hasReadyRequests());
- auto rcr = getNet()->scheduleSuccessfulResponse(bob.obj());
- getNet()->runReadyNetworkOperations();
-
- return rcr.cmdObj.getOwned();
+ return static_cast<Derived*>(this)->scheduleSuccessfulCursorResponse(
+ fieldName, start, end, cursorId);
}
BSONObj scheduleSuccessfulMultiCursorResponse(StringData fieldName,
size_t start,
size_t end,
std::vector<size_t> cursorIds) {
- NetworkInterfaceMock::InNetworkGuard ing(getNet());
-
- BSONObjBuilder bob;
- {
- BSONArrayBuilder cursors;
- int baseCursorValue = 1;
- for (auto cursorId : cursorIds) {
- BSONObjBuilder cursor;
- BSONArrayBuilder batch;
- ASSERT(start < end && end < INT_MAX);
- for (size_t i = start; i <= end; ++i) {
- batch.append(BSON("x" << static_cast<int>(i) * baseCursorValue).getOwned());
- }
- cursor.append(fieldName, batch.arr());
- cursor.append("id", (long long)(cursorId));
- cursor.append("ns", "test.test");
- auto cursorObj = BSON("cursor" << cursor.done() << "ok" << 1);
- cursors.append(cursorObj.getOwned());
- ++baseCursorValue;
- }
- bob.append("cursors", cursors.arr());
- }
- bob.append("ok", 1);
-
- ASSERT(getNet()->hasReadyRequests());
- auto rcr = getNet()->scheduleSuccessfulResponse(bob.obj());
- getNet()->runReadyNetworkOperations();
+ return static_cast<Derived*>(this)->scheduleSuccessfulMultiCursorResponse(
+ fieldName, start, end, cursorIds);
+ }
- return rcr.cmdObj.getOwned();
+ void scheduleErrorResponse(Status error) {
+ return static_cast<Derived*>(this)->scheduleErrorResponse(error);
+ }
+ void blackHoleNextOutgoingRequest() {
+ return static_cast<Derived*>(this)->blackHoleNextOutgoingRequest();
}
BSONObj scheduleSuccessfulKillCursorResponse(size_t cursorId) {
- NetworkInterfaceMock::InNetworkGuard ing(getNet());
-
- ASSERT(getNet()->hasReadyRequests());
- auto rcr = getNet()->scheduleSuccessfulResponse(
- BSON("cursorsKilled" << BSON_ARRAY((long long)(cursorId)) << "cursorsNotFound"
- << BSONArray() << "cursorsAlive" << BSONArray() << "cursorsUnknown"
- << BSONArray() << "ok" << 1));
- getNet()->runReadyNetworkOperations();
+ return static_cast<Derived*>(this)->scheduleSuccessfulKillCursorResponse(cursorId);
+ }
- return rcr.cmdObj.getOwned();
+ TaskExecutorCursor makeTec(RemoteCommandRequest rcr,
+ TaskExecutorCursor::Options&& options = {}) {
+ return static_cast<Derived*>(this)->makeTec(rcr, std::move(options));
}
bool hasReadyRequests() {
- NetworkInterfaceMock::InNetworkGuard ing(getNet());
- return getNet()->hasReadyRequests();
+ return static_cast<Derived*>(this)->hasReadyRequests();
}
- ServiceContext::UniqueServiceContext serviceCtx = ServiceContext::make();
- ServiceContext::UniqueClient client;
- ServiceContext::UniqueOperationContext opCtx;
-};
+ Base& asBase() {
+ return *this;
+ }
-/**
- * Ensure we work for a single simple batch
- */
-TEST_F(TaskExecutorCursorFixture, SingleBatchWorks) {
- const auto findCmd = BSON("find"
- << "test"
- << "batchSize" << 2);
- const CursorId cursorId = 0;
+ /**
+ * Ensure we work for a single simple batch
+ */
+ void SingleBatchWorksTest() {
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
+ const CursorId cursorId = 0;
- RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
- TaskExecutorCursor tec(&getExecutor(), rcr);
+ TaskExecutorCursor tec = makeTec(rcr);
- ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId));
+ ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId));
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1);
- ASSERT_FALSE(hasReadyRequests());
+ ASSERT_FALSE(hasReadyRequests());
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2);
- ASSERT_FALSE(tec.getNext(opCtx.get()));
-}
+ ASSERT_FALSE(tec.getNext(opCtx.get()));
+ }
-/**
- * Ensure the firstBatch can be read correctly when multiple cursors are returned.
- */
-TEST_F(TaskExecutorCursorFixture, MultipleCursorsSingleBatchSucceeds) {
- const auto aggCmd = BSON("aggregate"
+ /**
+ * Ensure the firstBatch can be read correctly when multiple cursors are returned.
+ */
+ void MultipleCursorsSingleBatchSucceedsTest() {
+ const auto aggCmd = BSON("aggregate"
+ << "test"
+ << "pipeline"
+ << BSON_ARRAY(BSON("returnMultipleCursors" << true)));
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get());
+
+ TaskExecutorCursor tec = makeTec(rcr);
+
+ ASSERT_BSONOBJ_EQ(aggCmd,
+ scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, {0, 0}));
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1);
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2);
+
+ ASSERT_FALSE(tec.getNext(opCtx.get()));
+
+ auto cursorVec = tec.releaseAdditionalCursors();
+ ASSERT_EQUALS(cursorVec.size(), 1);
+ auto secondCursor = std::move(cursorVec[0]);
+
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 2);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 4);
+ ASSERT_FALSE(hasReadyRequests());
+
+ ASSERT_FALSE(secondCursor.getNext(opCtx.get()));
+ }
+ /**
+ * The operation context under which we send the original cursor-establishing command
+ * can be destructed before getNext is called with new opCtx. Ensure that 'child'
+ * TaskExecutorCursors created from the original TEC's multi-cursor-response can safely
+ * operate if this happens/don't try and use the now-destroyed operation context.
+ * See SERVER-69702 for context
+ */
+ void ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest() {
+ auto lsid = makeLogicalSessionIdForTest();
+ opCtx->setLogicalSessionId(lsid);
+ const auto aggCmd = BSON("aggregate"
+ << "test"
+ << "pipeline"
+ << BSON_ARRAY(BSON("returnMultipleCursors" << true)));
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get());
+ TaskExecutorCursor tec = makeTec(rcr);
+ auto expected = BSON("aggregate"
<< "test"
- << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true)));
+ << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true))
+ << "lsid" << lsid.toBSON());
+ ASSERT_BSONOBJ_EQ(expected,
+ scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, {0, 0}));
+ // Before calling getNext (and therefore spawning child TECs), destroy the opCtx
+ // we used to send the initial query and make a new one.
+ opCtx.reset();
+ opCtx = client->makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ // Use the new opCtx to call getNext. The child TECs should not attempt to read from the
+ // now dead original opCtx.
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1);
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2);
+
+ ASSERT_FALSE(tec.getNext(opCtx.get()));
- RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get());
+ auto cursorVec = tec.releaseAdditionalCursors();
+ ASSERT_EQUALS(cursorVec.size(), 1);
+ auto secondCursor = std::move(cursorVec[0]);
- TaskExecutorCursor tec(&getExecutor(), rcr);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 2);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 4);
+ ASSERT_FALSE(hasReadyRequests());
- ASSERT_BSONOBJ_EQ(aggCmd, scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, {0, 0}));
+ ASSERT_FALSE(secondCursor.getNext(opCtx.get()));
+ }
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+ void MultipleCursorsGetMoreWorksTest() {
+ const auto aggCmd = BSON("aggregate"
+ << "test"
+ << "pipeline"
+ << BSON_ARRAY(BSON("returnMultipleCursors" << true)));
+
+ std::vector<size_t> cursorIds{1, 2};
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get());
+
+ TaskExecutorCursor tec = makeTec(rcr);
+
+ ASSERT_BSONOBJ_EQ(aggCmd,
+ scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, cursorIds));
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1);
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2);
+
+ auto cursorVec = tec.releaseAdditionalCursors();
+ ASSERT_EQUALS(cursorVec.size(), 1);
+
+ // If we try to getNext() at this point, we are interruptible and can timeout
+ ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100),
+ ErrorCodes::ExceededTimeLimit,
+ [&] { tec.getNext(opCtx.get()); }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+
+ // We can pick up after that interruption though
+ ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection"
+ << "test"),
+ scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorIds[0]));
+
+ // Repeat for second cursor.
+ auto secondCursor = std::move(cursorVec[0]);
+
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 2);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 4);
+
+ ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100),
+ ErrorCodes::ExceededTimeLimit,
+ [&] { secondCursor.getNext(opCtx.get()); }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+
+ ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection"
+ << "test"),
+ scheduleSuccessfulCursorResponse("nextBatch", 6, 8, cursorIds[1]));
+ // Read second batch, then schedule EOF on both cursors.
+ // Then read final document for each.
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 3);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 4);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 5);
+ scheduleSuccessfulCursorResponse("nextBatch", 6, 6, 0);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 6);
+
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 6);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 7);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 8);
+ scheduleSuccessfulCursorResponse("nextBatch", 12, 12, 0);
+ ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).value()["x"].Int(), 12);
+
+ // Shouldn't have any more requests, both cursors are closed.
+ ASSERT_FALSE(hasReadyRequests());
+
+ ASSERT_FALSE(tec.getNext(opCtx.get()));
+ ASSERT_FALSE(secondCursor.getNext(opCtx.get()));
+ }
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2);
+ /**
+ * Ensure we work if find fails (and that we receive the error code it failed with)
+ */
+ void FailureInFindTest() {
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
- ASSERT_FALSE(tec.getNext(opCtx.get()));
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
- auto cursorVec = tec.releaseAdditionalCursors();
- ASSERT_EQUALS(cursorVec.size(), 1);
- auto secondCursor = std::move(cursorVec[0]);
+ TaskExecutorCursor tec = makeTec(rcr);
- ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 2);
- ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 4);
- ASSERT_FALSE(hasReadyRequests());
+ scheduleErrorResponse(Status(ErrorCodes::BadValue, "an error"));
- ASSERT_FALSE(secondCursor.getNext(opCtx.get()));
-}
+ ASSERT_THROWS_CODE(tec.getNext(opCtx.get()), DBException, ErrorCodes::BadValue);
+ }
-TEST_F(TaskExecutorCursorFixture, MultipleCursorsGetMoreWorks) {
- const auto aggCmd = BSON("aggregate"
- << "test"
- << "pipeline" << BSON_ARRAY(BSON("returnMultipleCursors" << true)));
- std::vector<size_t> cursorIds{1, 2};
- RemoteCommandRequest rcr(HostAndPort("localhost"), "test", aggCmd, opCtx.get());
+ /**
+ * Ensure multiple batches works correctly
+ */
+ void MultipleBatchesWorksTest() {
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
+ CursorId cursorId = 1;
- TaskExecutorCursor tec(&getExecutor(), rcr);
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
- ASSERT_BSONOBJ_EQ(aggCmd, scheduleSuccessfulMultiCursorResponse("firstBatch", 1, 2, cursorIds));
+ TaskExecutorCursor tec = makeTec(rcr, [] {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 3;
+ return opts;
+ }());
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+ scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId);
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1);
- auto cursorVec = tec.releaseAdditionalCursors();
- ASSERT_EQUALS(cursorVec.size(), 1);
+ // ASSERT(hasReadyRequests());
- // If we try to getNext() at this point, we are interruptible and can timeout
- ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100),
- ErrorCodes::ExceededTimeLimit,
- [&] { tec.getNext(opCtx.get()); }),
- DBException,
- ErrorCodes::ExceededTimeLimit);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2);
- // We can pick up after that interruption though
- ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection"
- << "test"),
- scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorIds[0]));
-
- // Repeat for second cursor.
- auto secondCursor = std::move(cursorVec[0]);
-
- ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 2);
- ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 4);
-
- ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100),
- ErrorCodes::ExceededTimeLimit,
- [&] { secondCursor.getNext(opCtx.get()); }),
- DBException,
- ErrorCodes::ExceededTimeLimit);
-
- ASSERT_BSONOBJ_EQ(BSON("getMore" << 2LL << "collection"
- << "test"),
- scheduleSuccessfulCursorResponse("nextBatch", 6, 8, cursorIds[1]));
- // Read second batch on both cursors.
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 3);
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 4);
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 5);
- ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 6);
- ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 7);
- ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 8);
-
- // Schedule EOF on both cursors.
- scheduleSuccessfulCursorResponse("nextBatch", 6, 6, 0);
- scheduleSuccessfulCursorResponse("nextBatch", 12, 12, 0);
-
- // Read final document.
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 6);
- ASSERT_EQUALS(secondCursor.getNext(opCtx.get()).get()["x"].Int(), 12);
-
- // Shouldn't have any more requests, both cursors are closed.
- ASSERT_FALSE(hasReadyRequests());
+ // If we try to getNext() at this point, we are interruptible and can timeout
+ ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100),
+ ErrorCodes::ExceededTimeLimit,
+ [&] { tec.getNext(opCtx.get()); }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
- ASSERT_FALSE(tec.getNext(opCtx.get()));
- ASSERT_FALSE(secondCursor.getNext(opCtx.get()));
-}
+ // We can pick up after that interruption though
+ ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection"
+ << "test"
+ << "batchSize" << 3),
+ scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorId));
-/**
- * Ensure we work if find fails (and that we receive the error code it failed with)
- */
-TEST_F(TaskExecutorCursorFixture, FailureInFind) {
- const auto findCmd = BSON("find"
- << "test"
- << "batchSize" << 2);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 3);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 4);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 5);
- RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+ cursorId = 0;
+ scheduleSuccessfulCursorResponse("nextBatch", 6, 6, cursorId);
- TaskExecutorCursor tec(&getExecutor(), rcr);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 6);
- {
+ // We don't issue extra getmores after returning a 0 cursor id
+ ASSERT_FALSE(hasReadyRequests());
+
+ ASSERT_FALSE(tec.getNext(opCtx.get()));
+ }
+
+ /**
+ * Ensure we allow empty firstBatch.
+ */
+ void EmptyFirstBatchTest() {
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
+ const auto getMoreCmd = BSON("getMore" << 1LL << "collection"
+ << "test"
+ << "batchSize" << 3);
+ const CursorId cursorId = 1;
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+
+ TaskExecutorCursor tec = makeTec(rcr, [] {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 3;
+ return opts;
+ }());
+
+ // Schedule a cursor response with an empty "firstBatch". Use end < start so we don't
+ // append any doc to "firstBatch".
+ ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 0, cursorId));
+
+ stdx::thread th([&] {
+ // Wait for the getMore run by the getNext() below to be ready, and schedule a
+ // cursor response with a non-empty "nextBatch".
+ while (!hasReadyRequests()) {
+ sleepmillis(10);
+ }
+
+ ASSERT_BSONOBJ_EQ(getMoreCmd, scheduleSuccessfulCursorResponse("nextBatch", 1, 1, 0));
+ });
+
+ // Verify that the first doc is the doc from the second batch.
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1);
+
+ th.join();
+ }
+
+ /**
+ * Ensure we allow any empty non-initial batch.
+ */
+ void EmptyNonInitialBatchTest() {
+ const auto findCmd = BSON("find"
+ << "test"
+ << "batchSize" << 2);
+ const auto getMoreCmd = BSON("getMore" << 1LL << "collection"
+ << "test"
+ << "batchSize" << 3);
+ const CursorId cursorId = 1;
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+
+ TaskExecutorCursor tec = makeTec(rcr, [] {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 3;
+ return opts;
+ }());
+
+ // Schedule a cursor response with a non-empty "firstBatch".
+ ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 1, cursorId));
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 1);
+
+ // Schedule two consecutive cursor responses with empty "nextBatch". Use end < start so
+ // we don't append any doc to "nextBatch".
+ ASSERT_BSONOBJ_EQ(getMoreCmd,
+ scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId));
+
+ stdx::thread th([&] {
+ // Wait for the first getMore run by the getNext() below to be ready, and schedule a
+ // cursor response with a non-empty "nextBatch".
+ while (!hasReadyRequests()) {
+ sleepmillis(10);
+ }
+
+ ASSERT_BSONOBJ_EQ(getMoreCmd,
+ scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId));
+
+ // Wait for the second getMore run by the getNext() below to be ready, and schedule a
+ // cursor response with a non-empty "nextBatch".
+ while (!hasReadyRequests()) {
+ sleepmillis(10);
+ }
+
+ ASSERT_BSONOBJ_EQ(getMoreCmd, scheduleSuccessfulCursorResponse("nextBatch", 2, 2, 0));
+ });
+
+ // Verify that the next doc is the doc from the fourth batch.
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).value()["x"].Int(), 2);
+
+ th.join();
+ }
+
+ ServiceContext::UniqueServiceContext serviceCtx = ServiceContext::make();
+ ServiceContext::UniqueClient client;
+ ServiceContext::UniqueOperationContext opCtx;
+};
+
+class NonPinningTaskExecutorCursorTestFixture
+ : public TaskExecutorCursorTestFixture<NonPinningTaskExecutorCursorTestFixture,
+ ThreadPoolExecutorTest> {
+public:
+ void postSetUp() {
+ launchExecutorThread();
+ }
+
+ BSONObj scheduleSuccessfulCursorResponse(StringData fieldName,
+ size_t start,
+ size_t end,
+ size_t cursorId) {
NetworkInterfaceMock::InNetworkGuard ing(getNet());
+
ASSERT(getNet()->hasReadyRequests());
- getNet()->scheduleErrorResponse(Status(ErrorCodes::BadValue, "an error"));
+ auto rcr = getNet()->scheduleSuccessfulResponse(
+ buildCursorResponse(fieldName, start, end, cursorId));
getNet()->runReadyNetworkOperations();
+
+ return rcr.cmdObj.getOwned();
}
- ASSERT_THROWS_CODE(tec.getNext(opCtx.get()), DBException, ErrorCodes::BadValue);
-}
+ BSONObj scheduleSuccessfulMultiCursorResponse(StringData fieldName,
+ size_t start,
+ size_t end,
+ std::vector<size_t> cursorIds) {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
-/**
- * Ensure early termination of the cursor calls killCursor (if we know about the cursor id)
- */
-TEST_F(TaskExecutorCursorFixture, EarlyReturnKillsCursor) {
- const auto findCmd = BSON("find"
- << "test"
- << "batchSize" << 2);
- const CursorId cursorId = 1;
- RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+ ASSERT(getNet()->hasReadyRequests());
+ auto rcr = getNet()->scheduleSuccessfulResponse(
+ buildMultiCursorResponse(fieldName, start, end, cursorIds));
+ getNet()->runReadyNetworkOperations();
- {
- TaskExecutorCursor tec(&getExecutor(), rcr);
+ return rcr.cmdObj.getOwned();
+ }
- scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId);
+ BSONObj scheduleSuccessfulKillCursorResponse(size_t cursorId) {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
- ASSERT(tec.getNext(opCtx.get()));
- }
+ ASSERT(getNet()->hasReadyRequests());
+ auto rcr = getNet()->scheduleSuccessfulResponse(
+ BSON("cursorsKilled" << BSON_ARRAY((long long)(cursorId)) << "cursorsNotFound"
+ << BSONArray() << "cursorsAlive" << BSONArray() << "cursorsUnknown"
+ << BSONArray() << "ok" << 1));
+ getNet()->runReadyNetworkOperations();
- // Black hole the pending `getMore` operation scheduled by the `TaskExecutorCursor`.
- {
- NetworkInterfaceMock::InNetworkGuard guard(getNet());
- getNet()->blackHole(getNet()->getFrontOfUnscheduledQueue());
+ return rcr.cmdObj.getOwned();
}
- ASSERT_BSONOBJ_EQ(BSON("killCursors"
- << "test"
- << "cursors" << BSON_ARRAY(1)),
- scheduleSuccessfulKillCursorResponse(1));
-}
+ void scheduleErrorResponse(Status error) {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
-/**
- * Ensure multiple batches works correctly
- */
-TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) {
- const auto findCmd = BSON("find"
- << "test"
- << "batchSize" << 2);
- CursorId cursorId = 1;
+ ASSERT(getNet()->hasReadyRequests());
+ getNet()->scheduleErrorResponse(error);
+ getNet()->runReadyNetworkOperations();
+ }
- RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+ bool hasReadyRequests() {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
+ return getNet()->hasReadyRequests();
+ }
- TaskExecutorCursor tec(&getExecutor(), rcr, [] {
- TaskExecutorCursor::Options opts;
- opts.batchSize = 3;
- return opts;
- }());
+ void blackHoleNextOutgoingRequest() {
+ NetworkInterfaceMock::InNetworkGuard guard(getNet());
+ getNet()->blackHole(getNet()->getFrontOfUnscheduledQueue());
+ }
- scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId);
+ TaskExecutorCursor makeTec(RemoteCommandRequest rcr,
+ TaskExecutorCursor::Options&& options = {}) {
+ return TaskExecutorCursor(getExecutorPtr(), rcr, std::move(options));
+ }
+};
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+class PinnedConnTaskExecutorCursorTestFixture
+ : public TaskExecutorCursorTestFixture<PinnedConnTaskExecutorCursorTestFixture,
+ PinnedConnectionTaskExecutorTest> {
+public:
+ void postSetUp() {}
+
+ BSONObj scheduleResponse(StatusWith<BSONObj> response) {
+ int32_t responseToId;
+ BSONObj cmdObjReceived;
+ auto pf = makePromiseFuture<void>();
+ expectSinkMessage([&](Message m) {
+ responseToId = m.header().getId();
+ auto opMsg = OpMsgRequest::parse(m);
+ cmdObjReceived = opMsg.body.removeField("$db").getOwned();
+ pf.promise.emplaceValue();
+ return Status::OK();
+ });
+ // Wait until we recieved the command request.
+ pf.future.get();
+
+ // Now we expect source message to be called and provide the response
+ expectSourceMessage([=]() {
+ rpc::OpMsgReplyBuilder replyBuilder;
+ replyBuilder.setCommandReply(response);
+ auto message = replyBuilder.done();
+ message.header().setResponseToMsgId(responseToId);
+ return message;
+ });
+ return cmdObjReceived;
+ }
- ASSERT(hasReadyRequests());
+ BSONObj scheduleSuccessfulCursorResponse(StringData fieldName,
+ size_t start,
+ size_t end,
+ size_t cursorId) {
+ auto cursorResponse = buildCursorResponse(fieldName, start, end, cursorId);
+ return scheduleResponse(cursorResponse);
+ }
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2);
+ BSONObj scheduleSuccessfulMultiCursorResponse(StringData fieldName,
+ size_t start,
+ size_t end,
+ std::vector<size_t> cursorIds) {
+ auto cursorResponse = buildMultiCursorResponse(fieldName, start, end, cursorIds);
+ return scheduleResponse(cursorResponse);
+ }
- // If we try to getNext() at this point, we are interruptible and can timeout
- ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100),
- ErrorCodes::ExceededTimeLimit,
- [&] { tec.getNext(opCtx.get()); }),
- DBException,
- ErrorCodes::ExceededTimeLimit);
+ void scheduleErrorResponse(Status error) {
+ scheduleResponse(error);
+ }
- // We can pick up after that interruption though
- ASSERT_BSONOBJ_EQ(BSON("getMore" << 1LL << "collection"
- << "test"
- << "batchSize" << 3),
- scheduleSuccessfulCursorResponse("nextBatch", 3, 5, cursorId));
+ BSONObj scheduleSuccessfulKillCursorResponse(size_t cursorId) {
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 3);
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 4);
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 5);
+ auto cursorResponse =
+ BSON("cursorsKilled" << BSON_ARRAY((long long)(cursorId)) << "cursorsNotFound"
+ << BSONArray() << "cursorsAlive" << BSONArray() << "cursorsUnknown"
+ << BSONArray() << "ok" << 1);
+ return scheduleResponse(cursorResponse);
+ }
- cursorId = 0;
- scheduleSuccessfulCursorResponse("nextBatch", 6, 6, cursorId);
+ TaskExecutorCursor makeTec(RemoteCommandRequest rcr,
+ TaskExecutorCursor::Options&& options = {}) {
+ options.pinConnection = true;
+ return TaskExecutorCursor(getExecutorPtr(), rcr, std::move(options));
+ }
- // We don't issue extra getmores after returning a 0 cursor id
- ASSERT_FALSE(hasReadyRequests());
+ bool hasReadyRequests() {
+ return asBase().hasReadyRequests();
+ }
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 6);
+ void blackHoleNextOutgoingRequest() {
+ auto pf = makePromiseFuture<void>();
+ expectSinkMessage([&](Message m) {
+ pf.promise.emplaceValue();
+ return Status(ErrorCodes::SocketException, "test");
+ });
+ pf.future.get();
+ }
+};
- ASSERT_FALSE(tec.getNext(opCtx.get()));
+TEST_F(NonPinningTaskExecutorCursorTestFixture, SingleBatchWorks) {
+ SingleBatchWorksTest();
}
-/**
- * Ensure we allow empty firstBatch.
- */
-TEST_F(TaskExecutorCursorFixture, EmptyFirstBatch) {
- const auto findCmd = BSON("find"
- << "test"
- << "batchSize" << 2);
- const auto getMoreCmd = BSON("getMore" << 1LL << "collection"
- << "test"
- << "batchSize" << 3);
- const CursorId cursorId = 1;
+TEST_F(PinnedConnTaskExecutorCursorTestFixture, SingleBatchWorks) {
+ SingleBatchWorksTest();
+}
- RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSucceeds) {
+ MultipleCursorsSingleBatchSucceedsTest();
+}
- TaskExecutorCursor tec(&getExecutor(), rcr, [] {
- TaskExecutorCursor::Options opts;
- opts.batchSize = 3;
- return opts;
- }());
+TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleCursorsSingleBatchSucceeds) {
+ MultipleCursorsSingleBatchSucceedsTest();
+}
- // Schedule a cursor response with an empty "firstBatch". Use end < start so we don't
- // append any doc to "firstBatch".
- ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 0, cursorId));
+TEST_F(NonPinningTaskExecutorCursorTestFixture,
+ ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) {
+ ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest();
+}
- stdx::thread th([&] {
- // Wait for the getMore run by the getNext() below to be ready, and schedule a
- // cursor response with a non-empty "nextBatch".
- while (!hasReadyRequests()) {
- sleepmillis(10);
- }
+TEST_F(PinnedConnTaskExecutorCursorTestFixture,
+ ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructed) {
+ ChildTaskExecutorCursorsAreSafeIfOriginalOpCtxDestructedTest();
+}
+TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) {
+ MultipleCursorsGetMoreWorksTest();
+}
- ASSERT_BSONOBJ_EQ(getMoreCmd,
- scheduleSuccessfulCursorResponse("nextBatch", 1, 1, cursorId));
- });
+TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleCursorsGetMoreWorks) {
+ MultipleCursorsGetMoreWorksTest();
+}
- // Verify that the first doc is the doc from the second batch.
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+TEST_F(NonPinningTaskExecutorCursorTestFixture, FailureInFind) {
+ FailureInFindTest();
+}
- th.join();
+TEST_F(PinnedConnTaskExecutorCursorTestFixture, FailureInFind) {
+ FailureInFindTest();
}
/**
- * Ensure we allow any empty non-initial batch.
+ * Ensure early termination of the cursor calls killCursor (if we know about the cursor id)
+ * Only applicapble to the unpinned case - if the connection is pinned, and a getMore is
+ * in progress and/or fails, the most we can do is kill the connection. We can't re-use
+ * the connection to send killCursors.
*/
-TEST_F(TaskExecutorCursorFixture, EmptyNonInitialBatch) {
+TEST_F(NonPinningTaskExecutorCursorTestFixture, EarlyReturnKillsCursor) {
const auto findCmd = BSON("find"
<< "test"
<< "batchSize" << 2);
- const auto getMoreCmd = BSON("getMore" << 1LL << "collection"
- << "test"
- << "batchSize" << 3);
const CursorId cursorId = 1;
RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
- TaskExecutorCursor tec(&getExecutor(), rcr, [] {
- TaskExecutorCursor::Options opts;
- opts.batchSize = 3;
- return opts;
- }());
+ {
+ TaskExecutorCursor tec = makeTec(rcr);
- // Schedule a cursor response with a non-empty "firstBatch".
- ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 1, cursorId));
+ scheduleSuccessfulCursorResponse("firstBatch", 1, 2, cursorId);
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+ ASSERT(tec.getNext(opCtx.get()));
- // Schedule two consecutive cursor responses with empty "nextBatch". Use end < start so
- // we don't append any doc to "nextBatch".
- ASSERT_BSONOBJ_EQ(getMoreCmd, scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId));
+ // Black hole the pending `getMore` operation scheduled by the `TaskExecutorCursor`.
+ blackHoleNextOutgoingRequest();
+ }
- stdx::thread th([&] {
- // Wait for the first getMore run by the getNext() below to be ready, and schedule a
- // cursor response with a non-empty "nextBatch".
- while (!hasReadyRequests()) {
- sleepmillis(10);
- }
- ASSERT_BSONOBJ_EQ(getMoreCmd,
- scheduleSuccessfulCursorResponse("nextBatch", 1, 0, cursorId));
+ ASSERT_BSONOBJ_EQ(BSON("killCursors"
+ << "test"
+ << "cursors" << BSON_ARRAY(1)),
+ scheduleSuccessfulKillCursorResponse(1));
+}
- // Wait for the second getMore run by the getNext() below to be ready, and schedule a
- // cursor response with a non-empty "nextBatch".
- while (!hasReadyRequests()) {
- sleepmillis(10);
- }
+TEST_F(NonPinningTaskExecutorCursorTestFixture, MultipleBatchesWorks) {
+ MultipleBatchesWorksTest();
+}
- ASSERT_BSONOBJ_EQ(getMoreCmd,
- scheduleSuccessfulCursorResponse("nextBatch", 2, 2, cursorId));
- });
+TEST_F(PinnedConnTaskExecutorCursorTestFixture, MultipleBatchesWorks) {
+ MultipleBatchesWorksTest();
+}
+
+TEST_F(NonPinningTaskExecutorCursorTestFixture, EmptyFirstBatch) {
+ EmptyFirstBatchTest();
+}
+
+TEST_F(PinnedConnTaskExecutorCursorTestFixture, EmptyFirstBatch) {
+ EmptyFirstBatchTest();
+}
- // Verify that the next doc is the doc from the fourth batch.
- ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2);
+TEST_F(NonPinningTaskExecutorCursorTestFixture, EmptyNonInitialBatch) {
+ EmptyNonInitialBatchTest();
+}
- th.join();
+TEST_F(PinnedConnTaskExecutorCursorTestFixture, EmptyNonInitialBatch) {
+ EmptyNonInitialBatchTest();
}
/**
- * Ensure lsid is passed in all stages of querying
+ * Ensure the LSID is passed in all stages of querying. Need to test the
+ * pinning case separately because of difference around killCursor.
*/
-TEST_F(TaskExecutorCursorFixture, LsidIsPassed) {
+TEST_F(NonPinningTaskExecutorCursorTestFixture, LsidIsPassed) {
auto lsid = makeLogicalSessionIdForTest();
opCtx->setLogicalSessionId(lsid);
@@ -496,11 +767,11 @@ TEST_F(TaskExecutorCursorFixture, LsidIsPassed) {
RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
boost::optional<TaskExecutorCursor> tec;
- tec.emplace(&getExecutor(), rcr, []() {
+ tec.emplace(makeTec(rcr, []() {
TaskExecutorCursor::Options opts;
opts.batchSize = 1;
return opts;
- }());
+ }()));
// lsid in the first batch
ASSERT_BSONOBJ_EQ(BSON("find"
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 9c95467118f..900ed7508c8 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -231,6 +231,9 @@ private:
// Lifecycle state of this executor.
stdx::condition_variable _stateChange;
State _state = preStart;
+
+ friend std::shared_ptr<TaskExecutor> makePinnedConnectionTaskExecutor(
+ std::shared_ptr<TaskExecutor>);
};
} // namespace executor