summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2019-04-03 13:41:49 -0400
committerJason Carey <jcarey@argv.me>2019-04-10 18:42:49 -0400
commitfcb3f6a8db7bc45a1f0345c320c9286afa0e4cc8 (patch)
tree7bc42587405465f6d3f6b4f51a6414ce0d775922 /src/mongo/executor
parent50697670712655d8fdd711e7fcdc59328bddb106 (diff)
downloadmongo-fcb3f6a8db7bc45a1f0345c320c9286afa0e4cc8.tar.gz
SERVER-40414 TaskExecutorCursor
Add support for a dbclientcursor style api which uses the task executor to establish cursors, fetch batches and kill cursors. This is to offer a simple api which is opCtx interruptible and uses the newer connection pool code.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/SConscript40
-rw-r--r--src/mongo/executor/task_executor_cursor.cpp169
-rw-r--r--src/mongo/executor/task_executor_cursor.h140
-rw-r--r--src/mongo/executor/task_executor_cursor_integration_test.cpp113
-rw-r--r--src/mongo/executor/task_executor_cursor_test.cpp308
5 files changed, 770 insertions, 0 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index 0ff2eedeca7..364ff4772a4 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -260,3 +260,43 @@ env.Library(
'task_executor_interface',
],
)
+
+env.Library(
+ target='task_executor_cursor',
+ source=[
+ 'task_executor_cursor.cpp',
+ ],
+ LIBDEPS=[
+ 'task_executor_interface',
+ '$BUILD_DIR/mongo/db/query/command_request_response',
+ ],
+)
+
+env.CppIntegrationTest(
+ target='task_executor_cursor_integration_test',
+ source=[
+ 'task_executor_cursor_integration_test.cpp',
+ ],
+ LIBDEPS=[
+ 'task_executor_cursor',
+ '$BUILD_DIR/mongo/client/clientdriver_network',
+ '$BUILD_DIR/mongo/db/wire_version',
+ '$BUILD_DIR/mongo/executor/network_interface_factory',
+ '$BUILD_DIR/mongo/executor/network_interface_thread_pool',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
+ '$BUILD_DIR/mongo/transport/transport_layer_egress_init',
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool',
+ '$BUILD_DIR/mongo/util/version_impl',
+ ],
+)
+
+env.CppUnitTest(
+ target='task_executor_cursor_test',
+ source=[
+ 'task_executor_cursor_test.cpp',
+ ],
+ LIBDEPS=[
+ 'task_executor_cursor',
+ 'thread_pool_task_executor_test_fixture',
+ ],
+)
diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp
new file mode 100644
index 00000000000..15dff7632f2
--- /dev/null
+++ b/src/mongo/executor/task_executor_cursor.cpp
@@ -0,0 +1,169 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/task_executor_cursor.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/getmore_request.h"
+#include "mongo/db/query/killcursors_request.h"
+
+namespace mongo {
+namespace executor {
+
+TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor,
+ const RemoteCommandRequest& rcr,
+ Options&& options)
+ : _executor(executor), _rcr(rcr), _options(std::move(options)) {
+
+ if (rcr.opCtx) {
+ _lsid = rcr.opCtx->getLogicalSessionId();
+ }
+
+ _runRemoteCommand(_createRequest(_rcr.opCtx, _rcr.cmdObj));
+}
+
+TaskExecutorCursor::~TaskExecutorCursor() {
+ try {
+ if (_cbHandle) {
+ _executor->cancel(*_cbHandle);
+ }
+
+ if (_cursorId > 0) {
+ // We deliberately ignore failures to kill the cursor. This "best effort" is acceptable
+ // because some timeout mechanism on the remote host can be expected to reap it later.
+ //
+ // That timeout mechanism could be the default cursor timeout, or the logical session
+ // timeout if an lsid is used.
+ _executor
+ ->scheduleRemoteCommand(
+ _createRequest(nullptr, KillCursorsRequest(_ns, {_cursorId}).toBSON()),
+ [](const auto&) {})
+ .isOK();
+ }
+ } catch (const DBException&) {
+ }
+}
+
+boost::optional<BSONObj> TaskExecutorCursor::getNext(OperationContext* opCtx) {
+ if (_batchIter == _batch.end()) {
+ _getNextBatch(opCtx);
+ }
+
+ if (_batchIter == _batch.end()) {
+ return boost::none;
+ }
+
+ return std::move(*_batchIter++);
+}
+
+const RemoteCommandRequest& TaskExecutorCursor::_createRequest(OperationContext* opCtx,
+ const BSONObj& cmd) {
+ // we pull this every time for updated client metadata
+ _rcr.opCtx = opCtx;
+
+ _rcr.cmdObj = [&] {
+ if (!_lsid) {
+ return cmd;
+ }
+
+ BSONObjBuilder bob(cmd);
+ {
+ BSONObjBuilder subbob(bob.subobjStart("lsid"));
+ _lsid->serialize(&subbob);
+ subbob.done();
+ }
+
+ return bob.obj();
+ }();
+
+ return _rcr;
+}
+
+void TaskExecutorCursor::_runRemoteCommand(const RemoteCommandRequest& rcr) {
+ _cbHandle = uassertStatusOK(_executor->scheduleRemoteCommand(
+ rcr, [p = _pipe.producer](const TaskExecutor::RemoteCommandCallbackArgs& args) {
+ try {
+ if (args.response.isOK()) {
+ p.push(args.response.data);
+ } else {
+ p.push(args.response.status);
+ }
+ } catch (const DBException& ex) {
+ // If anything goes wrong, make sure we close the pipe to wake the caller of
+ // getNext()
+ p.close();
+ }
+ }));
+}
+
+void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) {
+ if (_cursorId == 0) {
+ return;
+ }
+
+ // pull out of the pipe before setting cursor id so we don't spoil this object if we're opCtx
+ // interrupted
+ auto out = uassertStatusOK(_pipe.consumer.pop(opCtx));
+
+ // If we had a cursor id, set it to 0 so that we don't attempt to kill the cursor if there was
+ // an error
+ if (_cursorId > 0) {
+ _cursorId = 0;
+ }
+
+ // if we've received a response from our last request (initial or getmore), our remote operation
+ // is done.
+ _cbHandle.reset();
+
+ auto cr = uassertStatusOK(CursorResponse::parseFromBSON(out));
+
+ // If this was our first batch
+ if (_cursorId == -1) {
+ _ns = cr.getNSS();
+ _rcr.dbname = _ns.db().toString();
+ }
+
+ _cursorId = cr.getCursorId();
+ _batch = cr.releaseBatch();
+ _batchIter = _batch.begin();
+
+ // If we got a cursor id back, pre-fetch the next batch
+ if (_cursorId) {
+ _runRemoteCommand(_createRequest(
+ opCtx, GetMoreRequest(_ns, _cursorId, _options.batchSize, {}, {}, {}).toBSON()));
+ }
+}
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h
new file mode 100644
index 00000000000..173229419c9
--- /dev/null
+++ b/src/mongo/executor/task_executor_cursor.h
@@ -0,0 +1,140 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <vector>
+
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/cursor_id.h"
+#include "mongo/db/logical_session_id.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/executor/remote_command_request.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/producer_consumer_queue.h"
+
+namespace mongo {
+namespace executor {
+
+/**
+ * A synchronous cursor API for managing a remote cursor that uses an async task executor to run all
+ * stages of the command cursor protocol (initial command, getMore, killCursors)
+ *
+ * The main differentiator for this type over DBClientCursor is the use of a task executor (which
+ * provides access to a different connection pool, as well as interruptibility) and the ability to
+ * overlap getMores. This starts fetching the next batch as soon as one is exhausted (rather than
+ * on a call to getNext()).
+ */
+class TaskExecutorCursor {
+public:
+ struct Options {
+ boost::optional<int64_t> batchSize;
+ };
+
+ /**
+ * Construct the cursor with a RemoteCommandRequest wrapping the initial command
+ *
+ * One value is carried over in successive calls to getMore/killCursor:
+ *
+ * opCtx - The Logical Session Id from the initial command is carried over in all later stages.
+ * NOTE - the actual command must not include the lsid
+ */
+ explicit TaskExecutorCursor(executor::TaskExecutor* executor,
+ const RemoteCommandRequest& rcr,
+ Options&& options = {});
+
+ /**
+ * Asynchronously kills async ops and kills the underlying cursor on destruction.
+ */
+ ~TaskExecutorCursor();
+
+ /**
+ * Returns the next document from this cursor until the cursor is exhausted (in which case we
+ * return an unset optional). This method can throw if there is an error running any commands,
+ * if the remote server returns a not ok command result, or if the passed in opCtx is
+ * interrupted (by killOp or maxTimeMS).
+ *
+ * The opCtx may also be used as the source of client metadata if this call to getNext()
+ * triggers a new getMore to fetch the next batch.
+ */
+ boost::optional<BSONObj> getNext(OperationContext* opCtx);
+
+private:
+ /**
+ * Runs a remote command and pipes the output back to this object
+ */
+ void _runRemoteCommand(const RemoteCommandRequest& rcr);
+
+ /**
+ * Gets the next batch with interruptibility via the opCtx
+ */
+ void _getNextBatch(OperationContext* opCtx);
+
+ /**
+ * Create a new request, annotating with lsid and current opCtx
+ */
+ const RemoteCommandRequest& _createRequest(OperationContext* opCtx, const BSONObj& cmd);
+
+ executor::TaskExecutor* _executor;
+
+ // Used as a scratch pad for the successive scheduleRemoteCommand calls
+ RemoteCommandRequest _rcr;
+
+ const Options _options;
+
+ // If the opCtx is in our initial request, re-use it for all subsequent operations
+ boost::optional<LogicalSessionId> _lsid;
+
+ // Stash the callbackhandle for the current outstanding operation
+ boost::optional<TaskExecutor::CallbackHandle> _cbHandle;
+
+ // cursor id has 1 of 3 states.
+ //
+ // <1 - We haven't yet received a response for our initial request
+ // 0 - Cursor is done (errored or consumed)
+ // >1 - Cursor is live on the remote
+ CursorId _cursorId = -1;
+
+ // Namespace after we resolved the initial request
+ NamespaceString _ns;
+
+ // Storage for the last batch we fetched
+ std::vector<BSONObj> _batch;
+ decltype(_batch)::iterator _batchIter;
+
+ // Multi producer because we hold onto the producer side in this object, as well as placing it
+ // into callbacks for the task executor
+ MultiProducerSingleConsumerQueue<StatusWith<BSONObj>>::Pipe _pipe;
+};
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/task_executor_cursor_integration_test.cpp b/src/mongo/executor/task_executor_cursor_integration_test.cpp
new file mode 100644
index 00000000000..e65068990dc
--- /dev/null
+++ b/src/mongo/executor/task_executor_cursor_integration_test.cpp
@@ -0,0 +1,113 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/task_executor_cursor.h"
+
+#include "mongo/client/dbclient_base.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/network_interface_thread_pool.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/unittest/integration_test.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace executor {
+namespace {
+
+class TaskExecutorCursorFixture : public mongo::unittest::Test {
+public:
+ void setUp() override {
+ std::shared_ptr<NetworkInterface> ni = makeNetworkInterface("TaskExecutorCursorTest");
+ auto tp = std::make_unique<NetworkInterfaceThreadPool>(ni.get());
+
+ _executor = std::make_unique<ThreadPoolTaskExecutor>(std::move(tp), std::move(ni));
+ _executor->startup();
+ };
+
+ void tearDown() override {
+ _executor->shutdown();
+ _executor.reset();
+ };
+
+ TaskExecutor* executor() {
+ return _executor.get();
+ }
+
+ ServiceContext::UniqueServiceContext _serviceCtx = ServiceContext::make();
+ std::unique_ptr<ThreadPoolTaskExecutor> _executor;
+};
+
+
+// Test that we can actually use a TaskExecutorCursor to read multiple batches from a remote host
+TEST_F(TaskExecutorCursorFixture, Basic) {
+ auto client = _serviceCtx->makeClient("TaskExecutorCursorTest");
+ auto opCtx = client->makeOperationContext();
+
+ // Write 100 documents to "test.test" via dbclient
+ std::string err;
+ auto dbclient = unittest::getFixtureConnectionString().connect("TaskExecutorCursorTest", err);
+
+ const size_t numDocs = 100;
+
+ std::vector<BSONObj> docs;
+ docs.reserve(numDocs);
+ for (size_t i = 0; i < numDocs; ++i) {
+ docs.emplace_back(BSON("x" << int(i)));
+ }
+ dbclient->dropCollection("test.test");
+ dbclient->insert("test.test", docs);
+ ASSERT_EQUALS(dbclient->count("test.test"), numDocs);
+
+ RemoteCommandRequest rcr(unittest::getFixtureConnectionString().getServers().front(),
+ "test",
+ BSON("find"
+ << "test"
+ << "batchSize"
+ << 10),
+ opCtx.get());
+
+ TaskExecutorCursor tec(executor(), rcr, [] {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 10;
+ return opts;
+ }());
+
+ size_t count = 0;
+ while (auto doc = tec.getNext(opCtx.get())) {
+ count++;
+ }
+
+ ASSERT_EQUALS(count, numDocs);
+}
+
+} // namespace
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp
new file mode 100644
index 00000000000..b1390e3b253
--- /dev/null
+++ b/src/mongo/executor/task_executor_cursor_test.cpp
@@ -0,0 +1,308 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/task_executor_cursor.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/unittest/bson_test_util.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace executor {
+namespace {
+
+/**
+ * Fixture for the task executor cursor tests which offers some convenience methods to help with
+ * scheduling responses
+ */
+class TaskExecutorCursorFixture : public ThreadPoolExecutorTest {
+public:
+ void setUp() override {
+ ThreadPoolExecutorTest::setUp();
+
+ client = serviceCtx->makeClient("TaskExecutorCursorTest");
+ opCtx = client->makeOperationContext();
+
+ launchExecutorThread();
+ }
+
+ void tearDown() override {
+ opCtx.reset();
+ client.reset();
+
+ ThreadPoolExecutorTest::tearDown();
+ }
+
+ BSONObj scheduleSuccessfulCursorResponse(StringData fieldName,
+ size_t start,
+ size_t end,
+ size_t cursorId) {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
+
+ BSONObjBuilder bob;
+ {
+ BSONObjBuilder cursor(bob.subobjStart("cursor"));
+ {
+ BSONArrayBuilder batch(cursor.subarrayStart(fieldName));
+
+ for (size_t i = start; i <= end; ++i) {
+ BSONObjBuilder doc(batch.subobjStart());
+ doc.append("x", int(i));
+ }
+ }
+ cursor.append("id", (long long)(cursorId));
+ cursor.append("ns", "test.test");
+ }
+ bob.append("ok", int(1));
+
+ ASSERT(getNet()->hasReadyRequests());
+ auto rcr = getNet()->scheduleSuccessfulResponse(bob.obj());
+ getNet()->runReadyNetworkOperations();
+
+ return rcr.cmdObj.getOwned();
+ }
+
+ BSONObj scheduleSuccessfulKillCursorResponse(size_t cursorId) {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
+
+ ASSERT(getNet()->hasReadyRequests());
+ auto rcr = getNet()->scheduleSuccessfulResponse(BSON(
+ "cursorsKilled" << BSON_ARRAY((long long)(cursorId)) << "cursorsNotFound" << BSONArray()
+ << "cursorsAlive"
+ << BSONArray()
+ << "cursorsUnknown"
+ << BSONArray()
+ << "ok"
+ << 1));
+ getNet()->runReadyNetworkOperations();
+
+ return rcr.cmdObj.getOwned();
+ }
+
+ bool hasReadyRequests() {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
+ return getNet()->hasReadyRequests();
+ }
+
+ ServiceContext::UniqueServiceContext serviceCtx = ServiceContext::make();
+ ServiceContext::UniqueClient client;
+ ServiceContext::UniqueOperationContext opCtx;
+};
+
+/**
+ * Ensure we work for a single simple batch
+ */
+TEST_F(TaskExecutorCursorFixture, SingleBatchWorks) {
+ auto findCmd = BSON("find"
+ << "test"
+ << "batchSize"
+ << 2);
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+
+ TaskExecutorCursor tec(&getExecutor(), rcr);
+
+ ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 0));
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+
+ ASSERT_FALSE(hasReadyRequests());
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2);
+
+ ASSERT_FALSE(tec.getNext(opCtx.get()));
+}
+
+/**
+ * Ensure we work if find fails (and that we receive the error code it failed with)
+ */
+TEST_F(TaskExecutorCursorFixture, FailureInFind) {
+ RemoteCommandRequest rcr(HostAndPort("localhost"),
+ "test",
+ BSON("find"
+ << "test"
+ << "batchSize"
+ << 2),
+ opCtx.get());
+
+ TaskExecutorCursor tec(&getExecutor(), rcr);
+
+ {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
+
+ ASSERT(getNet()->hasReadyRequests());
+ getNet()->scheduleErrorResponse(Status(ErrorCodes::BadValue, "an error"));
+ getNet()->runReadyNetworkOperations();
+ }
+
+ ASSERT_THROWS_CODE(tec.getNext(opCtx.get()), DBException, ErrorCodes::BadValue);
+}
+
+/**
+ * Ensure early termination of the cursor calls killCursor (if we know about the cursor id)
+ */
+TEST_F(TaskExecutorCursorFixture, EarlyReturnKillsCursor) {
+ RemoteCommandRequest rcr(HostAndPort("localhost"),
+ "test",
+ BSON("find"
+ << "test"
+ << "batchSize"
+ << 2),
+ opCtx.get());
+
+ {
+ TaskExecutorCursor tec(&getExecutor(), rcr);
+
+ scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 1);
+
+ ASSERT(tec.getNext(opCtx.get()));
+ }
+
+ ASSERT_BSONOBJ_EQ(BSON("killCursors"
+ << "test"
+ << "cursors"
+ << BSON_ARRAY(1)),
+ scheduleSuccessfulKillCursorResponse(1));
+}
+
+/**
+ * Ensure multiple batches works correctly
+ */
+TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) {
+ RemoteCommandRequest rcr(HostAndPort("localhost"),
+ "test",
+ BSON("find"
+ << "test"
+ << "batchSize"
+ << 2),
+ opCtx.get());
+
+ TaskExecutorCursor tec(&getExecutor(), rcr, [] {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 3;
+ return opts;
+ }());
+
+ scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 1);
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1);
+
+ ASSERT(hasReadyRequests());
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2);
+
+ // If we try to getNext() at this point, we are interruptible and can timeout
+ ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100),
+ ErrorCodes::ExceededTimeLimit,
+ [&] { tec.getNext(opCtx.get()); }),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+
+ // We can pick up after that interruption though
+ ASSERT_BSONOBJ_EQ(BSON("getMore" << long(1) << "collection"
+ << "test"
+ << "batchSize"
+ << 3),
+ scheduleSuccessfulCursorResponse("nextBatch", 3, 5, 1));
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 3);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 4);
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 5);
+
+ scheduleSuccessfulCursorResponse("nextBatch", 6, 6, 0);
+
+ // We don't issue extra getmores after returning a 0 cursor id
+ ASSERT_FALSE(hasReadyRequests());
+
+ ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 6);
+
+ ASSERT_FALSE(tec.getNext(opCtx.get()));
+}
+
+/**
+ * Ensure lsid is passed in all stages of querying
+ */
+TEST_F(TaskExecutorCursorFixture, LsidIsPassed) {
+ auto lsid = makeLogicalSessionIdForTest();
+ opCtx->setLogicalSessionId(lsid);
+
+ auto findCmd = BSON("find"
+ << "test"
+ << "batchSize"
+ << 1);
+
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get());
+
+ boost::optional<TaskExecutorCursor> tec;
+ tec.emplace(&getExecutor(), rcr, []() {
+ TaskExecutorCursor::Options opts;
+ opts.batchSize = 1;
+ return opts;
+ }());
+
+ // lsid in the first batch
+ ASSERT_BSONOBJ_EQ(BSON("find"
+ << "test"
+ << "batchSize"
+ << 1
+ << "lsid"
+ << lsid.toBSON()),
+ scheduleSuccessfulCursorResponse("firstBatch", 1, 1, 1));
+
+ ASSERT_EQUALS(tec->getNext(opCtx.get()).get()["x"].Int(), 1);
+
+ // lsid in the getmore
+ ASSERT_BSONOBJ_EQ(BSON("getMore" << long(1) << "collection"
+ << "test"
+ << "batchSize"
+ << 1
+ << "lsid"
+ << lsid.toBSON()),
+ scheduleSuccessfulCursorResponse("nextBatch", 2, 2, 1));
+
+ tec.reset();
+
+ // lsid in the killcursor
+ ASSERT_BSONOBJ_EQ(BSON("killCursors"
+ << "test"
+ << "cursors"
+ << BSON_ARRAY(1)
+ << "lsid"
+ << lsid.toBSON()),
+ scheduleSuccessfulKillCursorResponse(1));
+
+ ASSERT_FALSE(hasReadyRequests());
+}
+
+} // namespace
+} // namespace executor
+} // namespace mongo