diff options
author | Jason Carey <jcarey@argv.me> | 2019-04-03 13:41:49 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2019-04-10 18:42:49 -0400 |
commit | fcb3f6a8db7bc45a1f0345c320c9286afa0e4cc8 (patch) | |
tree | 7bc42587405465f6d3f6b4f51a6414ce0d775922 /src/mongo/executor | |
parent | 50697670712655d8fdd711e7fcdc59328bddb106 (diff) | |
download | mongo-fcb3f6a8db7bc45a1f0345c320c9286afa0e4cc8.tar.gz |
SERVER-40414 TaskExecutorCursor
Add support for a dbclientcursor style api which uses the task executor
to establish cursors, fetch batches and kill cursors.
This is to offer a simple api which is opCtx interruptible and uses the
newer connection pool code.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/SConscript | 40 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor.cpp | 169 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor.h | 140 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor_integration_test.cpp | 113 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_cursor_test.cpp | 308 |
5 files changed, 770 insertions, 0 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 0ff2eedeca7..364ff4772a4 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -260,3 +260,43 @@ env.Library( 'task_executor_interface', ], ) + +env.Library( + target='task_executor_cursor', + source=[ + 'task_executor_cursor.cpp', + ], + LIBDEPS=[ + 'task_executor_interface', + '$BUILD_DIR/mongo/db/query/command_request_response', + ], +) + +env.CppIntegrationTest( + target='task_executor_cursor_integration_test', + source=[ + 'task_executor_cursor_integration_test.cpp', + ], + LIBDEPS=[ + 'task_executor_cursor', + '$BUILD_DIR/mongo/client/clientdriver_network', + '$BUILD_DIR/mongo/db/wire_version', + '$BUILD_DIR/mongo/executor/network_interface_factory', + '$BUILD_DIR/mongo/executor/network_interface_thread_pool', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + '$BUILD_DIR/mongo/transport/transport_layer_egress_init', + '$BUILD_DIR/mongo/util/concurrency/thread_pool', + '$BUILD_DIR/mongo/util/version_impl', + ], +) + +env.CppUnitTest( + target='task_executor_cursor_test', + source=[ + 'task_executor_cursor_test.cpp', + ], + LIBDEPS=[ + 'task_executor_cursor', + 'thread_pool_task_executor_test_fixture', + ], +) diff --git a/src/mongo/executor/task_executor_cursor.cpp b/src/mongo/executor/task_executor_cursor.cpp new file mode 100644 index 00000000000..15dff7632f2 --- /dev/null +++ b/src/mongo/executor/task_executor_cursor.cpp @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/executor/task_executor_cursor.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/getmore_request.h" +#include "mongo/db/query/killcursors_request.h" + +namespace mongo { +namespace executor { + +TaskExecutorCursor::TaskExecutorCursor(executor::TaskExecutor* executor, + const RemoteCommandRequest& rcr, + Options&& options) + : _executor(executor), _rcr(rcr), _options(std::move(options)) { + + if (rcr.opCtx) { + _lsid = rcr.opCtx->getLogicalSessionId(); + } + + _runRemoteCommand(_createRequest(_rcr.opCtx, _rcr.cmdObj)); +} + +TaskExecutorCursor::~TaskExecutorCursor() { + try { + if (_cbHandle) { + _executor->cancel(*_cbHandle); + } + + if (_cursorId > 0) { + // We deliberately ignore failures to kill the cursor. This "best effort" is acceptable + // because some timeout mechanism on the remote host can be expected to reap it later. + // + // That timeout mechanism could be the default cursor timeout, or the logical session + // timeout if an lsid is used. + _executor + ->scheduleRemoteCommand( + _createRequest(nullptr, KillCursorsRequest(_ns, {_cursorId}).toBSON()), + [](const auto&) {}) + .isOK(); + } + } catch (const DBException&) { + } +} + +boost::optional<BSONObj> TaskExecutorCursor::getNext(OperationContext* opCtx) { + if (_batchIter == _batch.end()) { + _getNextBatch(opCtx); + } + + if (_batchIter == _batch.end()) { + return boost::none; + } + + return std::move(*_batchIter++); +} + +const RemoteCommandRequest& TaskExecutorCursor::_createRequest(OperationContext* opCtx, + const BSONObj& cmd) { + // we pull this every time for updated client metadata + _rcr.opCtx = opCtx; + + _rcr.cmdObj = [&] { + if (!_lsid) { + return cmd; + } + + BSONObjBuilder bob(cmd); + { + BSONObjBuilder subbob(bob.subobjStart("lsid")); + _lsid->serialize(&subbob); + subbob.done(); + } + + return bob.obj(); + }(); + + return _rcr; +} + +void TaskExecutorCursor::_runRemoteCommand(const RemoteCommandRequest& rcr) { + _cbHandle = uassertStatusOK(_executor->scheduleRemoteCommand( + rcr, [p = _pipe.producer](const TaskExecutor::RemoteCommandCallbackArgs& args) { + try { + if (args.response.isOK()) { + p.push(args.response.data); + } else { + p.push(args.response.status); + } + } catch (const DBException& ex) { + // If anything goes wrong, make sure we close the pipe to wake the caller of + // getNext() + p.close(); + } + })); +} + +void TaskExecutorCursor::_getNextBatch(OperationContext* opCtx) { + if (_cursorId == 0) { + return; + } + + // pull out of the pipe before setting cursor id so we don't spoil this object if we're opCtx + // interrupted + auto out = uassertStatusOK(_pipe.consumer.pop(opCtx)); + + // If we had a cursor id, set it to 0 so that we don't attempt to kill the cursor if there was + // an error + if (_cursorId > 0) { + _cursorId = 0; + } + + // if we've received a response from our last request (initial or getmore), our remote operation + // is done. + _cbHandle.reset(); + + auto cr = uassertStatusOK(CursorResponse::parseFromBSON(out)); + + // If this was our first batch + if (_cursorId == -1) { + _ns = cr.getNSS(); + _rcr.dbname = _ns.db().toString(); + } + + _cursorId = cr.getCursorId(); + _batch = cr.releaseBatch(); + _batchIter = _batch.begin(); + + // If we got a cursor id back, pre-fetch the next batch + if (_cursorId) { + _runRemoteCommand(_createRequest( + opCtx, GetMoreRequest(_ns, _cursorId, _options.batchSize, {}, {}, {}).toBSON())); + } +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/task_executor_cursor.h b/src/mongo/executor/task_executor_cursor.h new file mode 100644 index 00000000000..173229419c9 --- /dev/null +++ b/src/mongo/executor/task_executor_cursor.h @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <vector> + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/cursor_id.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/namespace_string.h" +#include "mongo/executor/remote_command_request.h" +#include "mongo/executor/task_executor.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/producer_consumer_queue.h" + +namespace mongo { +namespace executor { + +/** + * A synchronous cursor API for managing a remote cursor that uses an async task executor to run all + * stages of the command cursor protocol (initial command, getMore, killCursors) + * + * The main differentiator for this type over DBClientCursor is the use of a task executor (which + * provides access to a different connection pool, as well as interruptibility) and the ability to + * overlap getMores. This starts fetching the next batch as soon as one is exhausted (rather than + * on a call to getNext()). + */ +class TaskExecutorCursor { +public: + struct Options { + boost::optional<int64_t> batchSize; + }; + + /** + * Construct the cursor with a RemoteCommandRequest wrapping the initial command + * + * One value is carried over in successive calls to getMore/killCursor: + * + * opCtx - The Logical Session Id from the initial command is carried over in all later stages. + * NOTE - the actual command must not include the lsid + */ + explicit TaskExecutorCursor(executor::TaskExecutor* executor, + const RemoteCommandRequest& rcr, + Options&& options = {}); + + /** + * Asynchronously kills async ops and kills the underlying cursor on destruction. + */ + ~TaskExecutorCursor(); + + /** + * Returns the next document from this cursor until the cursor is exhausted (in which case we + * return an unset optional). This method can throw if there is an error running any commands, + * if the remote server returns a not ok command result, or if the passed in opCtx is + * interrupted (by killOp or maxTimeMS). + * + * The opCtx may also be used as the source of client metadata if this call to getNext() + * triggers a new getMore to fetch the next batch. + */ + boost::optional<BSONObj> getNext(OperationContext* opCtx); + +private: + /** + * Runs a remote command and pipes the output back to this object + */ + void _runRemoteCommand(const RemoteCommandRequest& rcr); + + /** + * Gets the next batch with interruptibility via the opCtx + */ + void _getNextBatch(OperationContext* opCtx); + + /** + * Create a new request, annotating with lsid and current opCtx + */ + const RemoteCommandRequest& _createRequest(OperationContext* opCtx, const BSONObj& cmd); + + executor::TaskExecutor* _executor; + + // Used as a scratch pad for the successive scheduleRemoteCommand calls + RemoteCommandRequest _rcr; + + const Options _options; + + // If the opCtx is in our initial request, re-use it for all subsequent operations + boost::optional<LogicalSessionId> _lsid; + + // Stash the callbackhandle for the current outstanding operation + boost::optional<TaskExecutor::CallbackHandle> _cbHandle; + + // cursor id has 1 of 3 states. + // + // <1 - We haven't yet received a response for our initial request + // 0 - Cursor is done (errored or consumed) + // >1 - Cursor is live on the remote + CursorId _cursorId = -1; + + // Namespace after we resolved the initial request + NamespaceString _ns; + + // Storage for the last batch we fetched + std::vector<BSONObj> _batch; + decltype(_batch)::iterator _batchIter; + + // Multi producer because we hold onto the producer side in this object, as well as placing it + // into callbacks for the task executor + MultiProducerSingleConsumerQueue<StatusWith<BSONObj>>::Pipe _pipe; +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/task_executor_cursor_integration_test.cpp b/src/mongo/executor/task_executor_cursor_integration_test.cpp new file mode 100644 index 00000000000..e65068990dc --- /dev/null +++ b/src/mongo/executor/task_executor_cursor_integration_test.cpp @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/task_executor_cursor.h" + +#include "mongo/client/dbclient_base.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/network_interface_thread_pool.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/unittest/integration_test.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace executor { +namespace { + +class TaskExecutorCursorFixture : public mongo::unittest::Test { +public: + void setUp() override { + std::shared_ptr<NetworkInterface> ni = makeNetworkInterface("TaskExecutorCursorTest"); + auto tp = std::make_unique<NetworkInterfaceThreadPool>(ni.get()); + + _executor = std::make_unique<ThreadPoolTaskExecutor>(std::move(tp), std::move(ni)); + _executor->startup(); + }; + + void tearDown() override { + _executor->shutdown(); + _executor.reset(); + }; + + TaskExecutor* executor() { + return _executor.get(); + } + + ServiceContext::UniqueServiceContext _serviceCtx = ServiceContext::make(); + std::unique_ptr<ThreadPoolTaskExecutor> _executor; +}; + + +// Test that we can actually use a TaskExecutorCursor to read multiple batches from a remote host +TEST_F(TaskExecutorCursorFixture, Basic) { + auto client = _serviceCtx->makeClient("TaskExecutorCursorTest"); + auto opCtx = client->makeOperationContext(); + + // Write 100 documents to "test.test" via dbclient + std::string err; + auto dbclient = unittest::getFixtureConnectionString().connect("TaskExecutorCursorTest", err); + + const size_t numDocs = 100; + + std::vector<BSONObj> docs; + docs.reserve(numDocs); + for (size_t i = 0; i < numDocs; ++i) { + docs.emplace_back(BSON("x" << int(i))); + } + dbclient->dropCollection("test.test"); + dbclient->insert("test.test", docs); + ASSERT_EQUALS(dbclient->count("test.test"), numDocs); + + RemoteCommandRequest rcr(unittest::getFixtureConnectionString().getServers().front(), + "test", + BSON("find" + << "test" + << "batchSize" + << 10), + opCtx.get()); + + TaskExecutorCursor tec(executor(), rcr, [] { + TaskExecutorCursor::Options opts; + opts.batchSize = 10; + return opts; + }()); + + size_t count = 0; + while (auto doc = tec.getNext(opCtx.get())) { + count++; + } + + ASSERT_EQUALS(count, numDocs); +} + +} // namespace +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/task_executor_cursor_test.cpp b/src/mongo/executor/task_executor_cursor_test.cpp new file mode 100644 index 00000000000..b1390e3b253 --- /dev/null +++ b/src/mongo/executor/task_executor_cursor_test.cpp @@ -0,0 +1,308 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/executor/task_executor_cursor.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/unittest/bson_test_util.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace executor { +namespace { + +/** + * Fixture for the task executor cursor tests which offers some convenience methods to help with + * scheduling responses + */ +class TaskExecutorCursorFixture : public ThreadPoolExecutorTest { +public: + void setUp() override { + ThreadPoolExecutorTest::setUp(); + + client = serviceCtx->makeClient("TaskExecutorCursorTest"); + opCtx = client->makeOperationContext(); + + launchExecutorThread(); + } + + void tearDown() override { + opCtx.reset(); + client.reset(); + + ThreadPoolExecutorTest::tearDown(); + } + + BSONObj scheduleSuccessfulCursorResponse(StringData fieldName, + size_t start, + size_t end, + size_t cursorId) { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + + BSONObjBuilder bob; + { + BSONObjBuilder cursor(bob.subobjStart("cursor")); + { + BSONArrayBuilder batch(cursor.subarrayStart(fieldName)); + + for (size_t i = start; i <= end; ++i) { + BSONObjBuilder doc(batch.subobjStart()); + doc.append("x", int(i)); + } + } + cursor.append("id", (long long)(cursorId)); + cursor.append("ns", "test.test"); + } + bob.append("ok", int(1)); + + ASSERT(getNet()->hasReadyRequests()); + auto rcr = getNet()->scheduleSuccessfulResponse(bob.obj()); + getNet()->runReadyNetworkOperations(); + + return rcr.cmdObj.getOwned(); + } + + BSONObj scheduleSuccessfulKillCursorResponse(size_t cursorId) { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + + ASSERT(getNet()->hasReadyRequests()); + auto rcr = getNet()->scheduleSuccessfulResponse(BSON( + "cursorsKilled" << BSON_ARRAY((long long)(cursorId)) << "cursorsNotFound" << BSONArray() + << "cursorsAlive" + << BSONArray() + << "cursorsUnknown" + << BSONArray() + << "ok" + << 1)); + getNet()->runReadyNetworkOperations(); + + return rcr.cmdObj.getOwned(); + } + + bool hasReadyRequests() { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + return getNet()->hasReadyRequests(); + } + + ServiceContext::UniqueServiceContext serviceCtx = ServiceContext::make(); + ServiceContext::UniqueClient client; + ServiceContext::UniqueOperationContext opCtx; +}; + +/** + * Ensure we work for a single simple batch + */ +TEST_F(TaskExecutorCursorFixture, SingleBatchWorks) { + auto findCmd = BSON("find" + << "test" + << "batchSize" + << 2); + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); + + TaskExecutorCursor tec(&getExecutor(), rcr); + + ASSERT_BSONOBJ_EQ(findCmd, scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 0)); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1); + + ASSERT_FALSE(hasReadyRequests()); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2); + + ASSERT_FALSE(tec.getNext(opCtx.get())); +} + +/** + * Ensure we work if find fails (and that we receive the error code it failed with) + */ +TEST_F(TaskExecutorCursorFixture, FailureInFind) { + RemoteCommandRequest rcr(HostAndPort("localhost"), + "test", + BSON("find" + << "test" + << "batchSize" + << 2), + opCtx.get()); + + TaskExecutorCursor tec(&getExecutor(), rcr); + + { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + + ASSERT(getNet()->hasReadyRequests()); + getNet()->scheduleErrorResponse(Status(ErrorCodes::BadValue, "an error")); + getNet()->runReadyNetworkOperations(); + } + + ASSERT_THROWS_CODE(tec.getNext(opCtx.get()), DBException, ErrorCodes::BadValue); +} + +/** + * Ensure early termination of the cursor calls killCursor (if we know about the cursor id) + */ +TEST_F(TaskExecutorCursorFixture, EarlyReturnKillsCursor) { + RemoteCommandRequest rcr(HostAndPort("localhost"), + "test", + BSON("find" + << "test" + << "batchSize" + << 2), + opCtx.get()); + + { + TaskExecutorCursor tec(&getExecutor(), rcr); + + scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 1); + + ASSERT(tec.getNext(opCtx.get())); + } + + ASSERT_BSONOBJ_EQ(BSON("killCursors" + << "test" + << "cursors" + << BSON_ARRAY(1)), + scheduleSuccessfulKillCursorResponse(1)); +} + +/** + * Ensure multiple batches works correctly + */ +TEST_F(TaskExecutorCursorFixture, MultipleBatchesWorks) { + RemoteCommandRequest rcr(HostAndPort("localhost"), + "test", + BSON("find" + << "test" + << "batchSize" + << 2), + opCtx.get()); + + TaskExecutorCursor tec(&getExecutor(), rcr, [] { + TaskExecutorCursor::Options opts; + opts.batchSize = 3; + return opts; + }()); + + scheduleSuccessfulCursorResponse("firstBatch", 1, 2, 1); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 1); + + ASSERT(hasReadyRequests()); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 2); + + // If we try to getNext() at this point, we are interruptible and can timeout + ASSERT_THROWS_CODE(opCtx->runWithDeadline(Date_t::now() + Milliseconds(100), + ErrorCodes::ExceededTimeLimit, + [&] { tec.getNext(opCtx.get()); }), + DBException, + ErrorCodes::ExceededTimeLimit); + + // We can pick up after that interruption though + ASSERT_BSONOBJ_EQ(BSON("getMore" << long(1) << "collection" + << "test" + << "batchSize" + << 3), + scheduleSuccessfulCursorResponse("nextBatch", 3, 5, 1)); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 3); + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 4); + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 5); + + scheduleSuccessfulCursorResponse("nextBatch", 6, 6, 0); + + // We don't issue extra getmores after returning a 0 cursor id + ASSERT_FALSE(hasReadyRequests()); + + ASSERT_EQUALS(tec.getNext(opCtx.get()).get()["x"].Int(), 6); + + ASSERT_FALSE(tec.getNext(opCtx.get())); +} + +/** + * Ensure lsid is passed in all stages of querying + */ +TEST_F(TaskExecutorCursorFixture, LsidIsPassed) { + auto lsid = makeLogicalSessionIdForTest(); + opCtx->setLogicalSessionId(lsid); + + auto findCmd = BSON("find" + << "test" + << "batchSize" + << 1); + + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", findCmd, opCtx.get()); + + boost::optional<TaskExecutorCursor> tec; + tec.emplace(&getExecutor(), rcr, []() { + TaskExecutorCursor::Options opts; + opts.batchSize = 1; + return opts; + }()); + + // lsid in the first batch + ASSERT_BSONOBJ_EQ(BSON("find" + << "test" + << "batchSize" + << 1 + << "lsid" + << lsid.toBSON()), + scheduleSuccessfulCursorResponse("firstBatch", 1, 1, 1)); + + ASSERT_EQUALS(tec->getNext(opCtx.get()).get()["x"].Int(), 1); + + // lsid in the getmore + ASSERT_BSONOBJ_EQ(BSON("getMore" << long(1) << "collection" + << "test" + << "batchSize" + << 1 + << "lsid" + << lsid.toBSON()), + scheduleSuccessfulCursorResponse("nextBatch", 2, 2, 1)); + + tec.reset(); + + // lsid in the killcursor + ASSERT_BSONOBJ_EQ(BSON("killCursors" + << "test" + << "cursors" + << BSON_ARRAY(1) + << "lsid" + << lsid.toBSON()), + scheduleSuccessfulKillCursorResponse(1)); + + ASSERT_FALSE(hasReadyRequests()); +} + +} // namespace +} // namespace executor +} // namespace mongo |