diff options
Diffstat (limited to 'src/mongo/s/query/async_results_merger_test.cpp')
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 280 |
1 files changed, 2 insertions, 278 deletions
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 7960d22f018..b852cf33f79 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -30,18 +30,13 @@ #include "mongo/s/query/async_results_merger.h" -#include "mongo/client/remote_command_targeter_factory_mock.h" -#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/json.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/query_request.h" -#include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor.h" -#include "mongo/executor/thread_pool_task_executor_test_fixture.h" -#include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/sharding_router_test_fixture.h" +#include "mongo/s/query/results_merger_test_fixture.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" @@ -50,212 +45,11 @@ namespace mongo { namespace { -using executor::NetworkInterfaceMock; -using executor::RemoteCommandRequest; -using executor::RemoteCommandResponse; - -using ResponseStatus = executor::TaskExecutor::ResponseStatus; - -const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); -const std::vector<ShardId> kTestShardIds = { - ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")}; -const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345), - HostAndPort("FakeShard2Host", 12345), - HostAndPort("FakeShard3Host", 12345)}; - -const NamespaceString kTestNss("testdb.testcoll"); - LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) { return LogicalSessionId::parse(IDLParserErrorContext("lsid"), cmdObj["lsid"].Obj()); } -class AsyncResultsMergerTest : public ShardingTestFixture { -public: - AsyncResultsMergerTest() {} - - void setUp() override { - setRemote(HostAndPort("ClientHost", 12345)); - - configTargeter()->setFindHostReturnValue(kTestConfigShardHost); - - std::vector<ShardType> shards; - - for (size_t i = 0; i < kTestShardIds.size(); i++) { - ShardType shardType; - shardType.setName(kTestShardIds[i].toString()); - shardType.setHost(kTestShardHosts[i].toString()); - - shards.push_back(shardType); - - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i])); - targeter->setFindHostReturnValue(kTestShardHosts[i]); - - targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]), - std::move(targeter)); - } - - setupShards(shards); - } - -protected: - /** - * Constructs an ARM with the given vector of existing cursors. - * - * If 'findCmd' is not set, the default AsyncResultsMergerParams are used. - * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams. - * - * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the - * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.' - */ - std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors( - std::vector<RemoteCursor> remoteCursors, - boost::optional<BSONObj> findCmd = boost::none, - boost::optional<std::int64_t> getMoreBatchSize = boost::none) { - AsyncResultsMergerParams params; - params.setNss(kTestNss); - params.setRemotes(std::move(remoteCursors)); - - - if (findCmd) { - const auto qr = unittest::assertGet( - QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */)); - if (!qr->getSort().isEmpty()) { - params.setSort(qr->getSort().getOwned()); - } - - if (getMoreBatchSize) { - params.setBatchSize(getMoreBatchSize); - } else { - params.setBatchSize(qr->getBatchSize() - ? boost::optional<std::int64_t>( - static_cast<std::int64_t>(*qr->getBatchSize())) - : boost::none); - } - params.setTailableMode(qr->getTailableMode()); - params.setAllowPartialResults(qr->isAllowPartialResults()); - } - - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(operationContext()->getLogicalSessionId()); - sessionInfo.setTxnNumber(operationContext()->getTxnNumber()); - params.setOperationSessionInfo(sessionInfo); - - return stdx::make_unique<AsyncResultsMerger>( - operationContext(), executor(), std::move(params)); - } - - /** - * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition. - */ - void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) { - BSONObjBuilder viewDefBob; - viewDefBob.append("ns", ns); - viewDefBob.append("pipeline", fromjson(pipelineJsonArr)); - - BSONObjBuilder bob; - bob.append("resolvedView", viewDefBob.obj()); - bob.append("ok", 0.0); - bob.append("errmsg", "Command on view must be executed by mongos"); - bob.append("code", 169); - - std::vector<BSONObj> batch = {bob.obj()}; - scheduleNetworkResponseObjs(batch); - } - - /** - * Schedules a list of cursor responses to be returned by the mock network. - */ - void scheduleNetworkResponses(std::vector<CursorResponse> responses) { - std::vector<BSONObj> objs; - for (const auto& cursorResponse : responses) { - // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are - // subsequent responses, since the AsyncResultsMerger will only ever run getMores. - objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse)); - } - scheduleNetworkResponseObjs(objs); - } - - /** - * Schedules a list of raw BSON command responses to be returned by the mock network. - */ - void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - for (const auto& obj : objs) { - ASSERT_TRUE(net->hasReadyRequests()); - Milliseconds millis(0); - RemoteCommandResponse response(obj, millis); - executor::TaskExecutor::ResponseStatus responseStatus(response); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); - } - net->runReadyNetworkOperations(); - net->exitNetwork(); - } - - RemoteCommandRequest getNthPendingRequest(size_t n) { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - NetworkInterfaceMock::NetworkOperationIterator noi = net->getNthUnscheduledRequest(n); - RemoteCommandRequest retRequest = noi->getRequest(); - net->exitNetwork(); - return retRequest; - } - - bool networkHasReadyRequests() { - NetworkInterfaceMock::InNetworkGuard guard(network()); - return guard->hasReadyRequests(); - } - - void scheduleErrorResponse(ResponseStatus rs) { - invariant(!rs.isOK()); - rs.elapsedMillis = Milliseconds(0); - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs); - net->runReadyNetworkOperations(); - net->exitNetwork(); - } - - void runReadyCallbacks() { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - net->runReadyNetworkOperations(); - net->exitNetwork(); - } - - void blackHoleNextRequest() { - executor::NetworkInterfaceMock* net = network(); - net->enterNetwork(); - ASSERT_TRUE(net->hasReadyRequests()); - net->blackHole(net->getNextReadyRequest()); - net->exitNetwork(); - } -}; - -void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) { - ASSERT_TRUE(killCmd.hasElement("killCursors")); - ASSERT_EQ(killCmd["cursors"].type(), BSONType::Array); - - size_t numCursors = 0; - for (auto&& cursor : killCmd["cursors"].Obj()) { - ASSERT_EQ(cursor.type(), BSONType::NumberLong); - ASSERT_EQ(cursor.numberLong(), cursorId); - ++numCursors; - } - ASSERT_EQ(numCursors, 1u); -} - -RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) { - RemoteCursor remoteCursor; - remoteCursor.setShardId(std::move(shardId)); - remoteCursor.setHostAndPort(std::move(host)); - remoteCursor.setCursorResponse(std::move(response)); - return remoteCursor; -} +using AsyncResultsMergerTest = ResultsMergerTestFixture; TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { std::vector<RemoteCursor> cursors; @@ -1888,76 +1682,6 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin executor()->waitForEvent(killEvent); } -TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); - - // Before any requests are scheduled, ARM is not ready to return results. - ASSERT_FALSE(arm->ready()); - ASSERT_FALSE(arm->remotesExhausted()); - - // Issue a blocking wait for the next result asynchronously on a different thread. - auto future = launchAsync([&]() { - auto next = unittest::assertGet(arm->blockingNext()); - ASSERT_FALSE(next.isEOF()); - ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1)); - next = unittest::assertGet(arm->blockingNext()); - ASSERT_TRUE(next.isEOF()); - }); - - // Schedule the response to the getMore which will return the next result and mark the cursor as - // exhausted. - onCommand([&](const auto& request) { - ASSERT(request.cmdObj["getMore"]); - return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) - .toBSON(CursorResponse::ResponseType::SubsequentResponse); - }); - - future.timed_get(kFutureTimeout); -} - -TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); - - // Issue a blocking wait for the next result asynchronously on a different thread. - auto future = launchAsync([&]() { - auto nextStatus = arm->blockingNext(); - ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted); - }); - - // Now mark the OperationContext as killed from this thread. - { - stdx::lock_guard<Client> lk(*operationContext()->getClient()); - operationContext()->markKilled(ErrorCodes::Interrupted); - } - future.timed_get(kFutureTimeout); - // Be careful not to use a blocking kill here, since the main thread is in charge of running the - // callbacks, and we'd block on ourselves. - auto killEvent = arm->kill(operationContext()); - - assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1); - runReadyCallbacks(); - executor()->waitForEvent(killEvent); -} - -TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) { - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - auto arm = makeARMFromExistingCursors(std::move(cursors)); - - // Before any requests are scheduled, ARM is not ready to return results. - ASSERT_FALSE(arm->ready()); - ASSERT_FALSE(arm->remotesExhausted()); - - arm->blockingKill(operationContext()); -} - TEST_F(AsyncResultsMergerTest, GetMoresShouldNotIncludeLSIDOrTxnNumberIfNoneSpecified) { std::vector<RemoteCursor> cursors; cursors.emplace_back( |