/** * Copyright 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/s/query/async_results_merger.h" #include "mongo/db/json.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/lite_parsed_query.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/task_executor.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" namespace mongo { namespace { using executor::NetworkInterfaceMock; using executor::RemoteCommandRequest; using executor::RemoteCommandResponse; class AsyncResultsMergerTest : public executor::ThreadPoolExecutorTest { public: AsyncResultsMergerTest() : _nss("testdb.testcoll"), _remotes({HostAndPort("localhost", -1), HostAndPort("localhost", -2), HostAndPort("localhost", -3)}) {} void setUp() final { ThreadPoolExecutorTest::setUp(); launchExecutorThread(); executor = &getExecutor(); } void postExecutorThreadLaunch() final {} protected: /** * Given a find command specification, 'findCmd', and a list of remote host:port pairs, * constructs the appropriate ARM. * * If 'batchSize' is set (i.e. not equal to boost::none), this batchSize is used for each * getMore. If 'findCmd' has a batchSize, this is used just for the initial find operation. */ void makeCursorFromFindCmd(const BSONObj& findCmd, const std::vector& remotes, boost::optional getMoreBatchSize = boost::none, bool isSecondaryOk = false) { const bool isExplain = true; const auto lpq = unittest::assertGet(LiteParsedQuery::makeFromFindCommand(_nss, findCmd, isExplain)); params = ClusterClientCursorParams(_nss); params.sort = lpq->getSort(); params.limit = lpq->getLimit(); params.batchSize = getMoreBatchSize ? getMoreBatchSize : lpq->getBatchSize(); params.skip = lpq->getSkip(); params.isTailable = lpq->isTailable(); params.isSecondaryOk = isSecondaryOk; for (const auto& hostAndPort : remotes) { params.remotes.emplace_back(hostAndPort, "testShard", findCmd); } arm = stdx::make_unique(executor, params); } /** * Given a vector of (HostAndPort, CursorIds) representing a set of existing cursors, constructs * the appropriate ARM. The default CCC parameters are used. */ void makeCursorFromExistingCursors( const std::vector>& remotes) { params = ClusterClientCursorParams(_nss); for (const auto& hostIdPair : remotes) { params.remotes.emplace_back(hostIdPair.first, hostIdPair.second); } arm = stdx::make_unique(executor, params); } /** * Schedules a list of cursor responses to be returned by the mock network. */ void scheduleNetworkResponses(std::vector responses, CursorResponse::ResponseType responseType) { std::vector objs; for (const auto& cursorResponse : responses) { objs.push_back(cursorResponse.toBSON(responseType)); } scheduleNetworkResponseObjs(objs); } /** * Schedules a list of raw BSON command responses to be returned by the mock network. */ void scheduleNetworkResponseObjs(std::vector objs) { executor::NetworkInterfaceMock* net = getNet(); net->enterNetwork(); for (const auto& obj : objs) { ASSERT_TRUE(net->hasReadyRequests()); Milliseconds millis(0); RemoteCommandResponse response(obj, BSONObj(), millis); executor::TaskExecutor::ResponseStatus responseStatus(response); net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); } net->runReadyNetworkOperations(); net->exitNetwork(); } RemoteCommandRequest getFirstPendingRequest() { executor::NetworkInterfaceMock* net = getNet(); net->enterNetwork(); ASSERT_TRUE(net->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = net->getFrontOfUnscheduledQueue(); RemoteCommandRequest retRequest = noi->getRequest(); net->exitNetwork(); return retRequest; } void scheduleErrorResponse(Status status) { invariant(!status.isOK()); executor::NetworkInterfaceMock* net = getNet(); net->enterNetwork(); ASSERT_TRUE(net->hasReadyRequests()); net->scheduleResponse(net->getNextReadyRequest(), net->now(), status); net->runReadyNetworkOperations(); net->exitNetwork(); } void blackHoleNextRequest() { executor::NetworkInterfaceMock* net = getNet(); net->enterNetwork(); ASSERT_TRUE(net->hasReadyRequests()); net->blackHole(net->getNextReadyRequest()); net->exitNetwork(); } const NamespaceString _nss; const std::vector _remotes; executor::TaskExecutor* executor; ClusterClientCursorParams params; std::unique_ptr arm; }; TEST_F(AsyncResultsMergerTest, ClusterFind) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, _remotes); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // First shard responds. std::vector responses; std::vector batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(0), batch1); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); // Can't return any results until we have a response from all three shards. ASSERT_FALSE(arm->ready()); // Second two shards respond. responses.clear(); std::vector batch2 = {fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(0), batch2); std::vector batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); } TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); makeCursorFromFindCmd(findCmd, _remotes); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(10), batch1); std::vector batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(11), batch2); std::vector batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(12), batch3); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady())); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); responses.clear(); std::vector batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")}; responses.emplace_back(_nss, CursorId(10), batch4); std::vector batch5 = {fromjson("{_id: 9}")}; responses.emplace_back(_nss, CursorId(0), batch5); std::vector batch6 = {fromjson("{_id: 10}")}; responses.emplace_back(_nss, CursorId(0), batch6); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 10}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 9}"), *unittest::assertGet(arm->nextReady())); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); responses.clear(); std::vector batch7 = {fromjson("{_id: 11}")}; responses.emplace_back(_nss, CursorId(0), batch7); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 11}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); } TEST_F(AsyncResultsMergerTest, ClusterFindSorted) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}, batchSize: 2}"); makeCursorFromFindCmd(findCmd, _remotes); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{_id: 5, $sortKey: {'': 5}}"), fromjson("{_id: 6, $sortKey: {'': 6}}")}; responses.emplace_back(_nss, CursorId(0), batch1); std::vector batch2 = {fromjson("{_id: 3, $sortKey: {'': 3}}"), fromjson("{_id: 9, $sortKey: {'': 9}}")}; responses.emplace_back(_nss, CursorId(0), batch2); std::vector batch3 = {fromjson("{_id: 4, $sortKey: {'': 4}}"), fromjson("{_id: 8, $sortKey: {'': 8}}")}; responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3, $sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 4, $sortKey: {'': 4}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 5, $sortKey: {'': 5}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 6, $sortKey: {'': 6}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 8, $sortKey: {'': 8}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 9, $sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); } TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}, batchSize: 2}"); makeCursorFromFindCmd(findCmd, _remotes); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")}; responses.emplace_back(_nss, CursorId(1), batch1); std::vector batch2 = {fromjson("{$sortKey: {'': 3}}"), fromjson("{$sortKey: {'': 4}}")}; responses.emplace_back(_nss, CursorId(0), batch2); std::vector batch3 = {fromjson("{$sortKey: {'': 7}}"), fromjson("{$sortKey: {'': 8}}")}; responses.emplace_back(_nss, CursorId(2), batch3); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 3}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 4}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 5}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 6}}"), *unittest::assertGet(arm->nextReady())); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); responses.clear(); std::vector batch4 = {fromjson("{$sortKey: {'': 7}}"), fromjson("{$sortKey: {'': 10}}")}; responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 7}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 8}}"), *unittest::assertGet(arm->nextReady())); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); responses.clear(); std::vector batch5 = {fromjson("{$sortKey: {'': 9}}"), fromjson("{$sortKey: {'': 10}}")}; responses.emplace_back(_nss, CursorId(0), batch5); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 9}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 10}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 10}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); } TEST_F(AsyncResultsMergerTest, ClusterFindCompoundSortKey) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}, batchSize: 2}"); makeCursorFromFindCmd(findCmd, _remotes); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{$sortKey: {'': 5, '': 9}}"), fromjson("{$sortKey: {'': 4, '': 20}}")}; responses.emplace_back(_nss, CursorId(0), batch1); std::vector batch2 = {fromjson("{$sortKey: {'': 10, '': 11}}"), fromjson("{$sortKey: {'': 4, '': 4}}")}; responses.emplace_back(_nss, CursorId(0), batch2); std::vector batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"), fromjson("{$sortKey: {'': 5, '': 9}}")}; responses.emplace_back(_nss, CursorId(0), batch3); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 11}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 10, '': 12}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 5, '': 9}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 5, '': 9}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 4, '': 4}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{$sortKey: {'': 4, '': 20}}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); } TEST_F(AsyncResultsMergerTest, ClusterFindSortedButNoSortKey) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {a: -1, b: 1}, batchSize: 2}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Parsing the batch results in an error because the sort key is missing. std::vector responses; std::vector batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); auto statusWithNext = arm->nextReady(); ASSERT(!statusWithNext.isOK()); ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::InternalError); // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); executor->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { // Initial batchSize sent with the find command is zero; batchSize sent with each getMore // command is one. BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 0}"); const long long getMoreBatchSize = 1LL; makeCursorFromFindCmd(findCmd, {_remotes[0], _remotes[1]}, getMoreBatchSize); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Both shards give back empty responses. Second shard doesn't have any results so it // sends back a cursor id of zero. std::vector responses; responses.emplace_back(_nss, CursorId(1), std::vector()); responses.emplace_back(_nss, CursorId(0), std::vector()); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); // In handling the responses from the first shard, the ARM should have already asked // for an additional batch from that shard. It won't have anything to return until it // gets a non-empty response. ASSERT_FALSE(arm->ready()); responses.clear(); std::vector batch1 = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // The shard responds with another empty batch but leaves the cursor open. It probably shouldn't // do this, but there's no reason the ARM can't handle this by asking for more. responses.clear(); responses.emplace_back(_nss, CursorId(1), std::vector()); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); // The shard responds with another batch and closes the cursor. ASSERT_FALSE(arm->ready()); responses.clear(); std::vector batch2 = {fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); } TEST_F(AsyncResultsMergerTest, ExistingCursors) { makeCursorFromExistingCursors({{_remotes[0], 5}, {_remotes[1], 6}}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch1); std::vector batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); } TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); makeCursorFromFindCmd(findCmd, {_remotes[0], _remotes[1]}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Both shards respond with the first batch. std::vector responses; std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); std::vector batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(2), batch2); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 4}"), *unittest::assertGet(arm->nextReady())); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // When we ask the shards for their next batch, the first shard responds and the second shard // never responds. responses.clear(); std::vector batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(1), batch3); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); blackHoleNextRequest(); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 5}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 6}"), *unittest::assertGet(arm->nextReady())); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // We can continue to return results from first shard, while second shard remains unresponsive. responses.clear(); std::vector batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")}; responses.emplace_back(_nss, CursorId(0), batch4); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 7}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 8}"), *unittest::assertGet(arm->nextReady())); // Kill cursor before deleting it, as the second remote cursor has not been exhausted. We don't // wait on 'killEvent' here, as the blackholed request's callback will only run on shutdown of // the network interface. auto killEvent = arm->kill(); ASSERT_TRUE(killEvent.isValid()); } TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(123), batch1); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); responses.clear(); std::vector batch2 = { fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(456), batch2); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT(!arm->nextReady().isOK()); // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); executor->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, _remotes); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1) .toBSON(CursorResponse::ResponseType::InitialResponse); BSONObj response2 = fromjson("{foo: 'bar'}"); std::vector batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")}; BSONObj response3 = CursorResponse(_nss, CursorId(456), batch3) .toBSON(CursorResponse::ResponseType::InitialResponse); scheduleNetworkResponseObjs({response1, response2, response3}); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); auto statusWithNext = arm->nextReady(); ASSERT(!statusWithNext.isOK()); // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); executor->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, _remotes); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); std::vector batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(2), batch2); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"}); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); auto statusWithNext = arm->nextReady(); ASSERT(!statusWithNext.isOK()); ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::BadValue); ASSERT_EQ(statusWithNext.getStatus().reason(), "bad thing happened"); // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); executor->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); // Error to call nextEvent() before the previous event is signaled. ASSERT_NOT_OK(arm->nextEvent().getStatus()); std::vector responses; std::vector batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); // Required to kill the 'arm' on error before destruction. auto killEvent = arm->kill(); executor->waitForEvent(killEvent); } TEST_F(AsyncResultsMergerTest, NextEventAfterTaskExecutorShutdown) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, _remotes); executor->shutdown(); ASSERT_NOT_OK(arm->nextEvent().getStatus()); auto killEvent = arm->kill(); ASSERT_FALSE(killEvent.isValid()); } TEST_F(AsyncResultsMergerTest, KillAfterTaskExecutorShutdownWithOutstandingBatches) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); // Make a request to the shard that will never get answered. ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); blackHoleNextRequest(); // Executor shuts down before a response is received. executor->shutdown(); auto killEvent = arm->kill(); ASSERT_FALSE(killEvent.isValid()); } TEST_F(AsyncResultsMergerTest, KillNoBatchesRequested) { BSONObj findCmd = fromjson("{find: 'testcoll'}"); makeCursorFromFindCmd(findCmd, _remotes); ASSERT_FALSE(arm->ready()); auto killedEvent = arm->kill(); // Killed cursors are considered ready, but return an error when you try to receive the next // doc. ASSERT_TRUE(arm->ready()); ASSERT_NOT_OK(arm->nextReady().getStatus()); executor->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); makeCursorFromFindCmd(findCmd, _remotes); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch1); std::vector batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(0), batch2); std::vector batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(123), batch3); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); // Kill should be able to return right away if there are no pending batches. auto killedEvent = arm->kill(); ASSERT_TRUE(arm->ready()); ASSERT_NOT_OK(arm->nextReady().getStatus()); executor->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); makeCursorFromFindCmd(findCmd, _remotes); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch1); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); // Kill event will only be signalled once the pending batches are received. auto killedEvent = arm->kill(); // After the kill, the ARM waits for outstanding batches to come back. This ensures that we // receive cursor ids for any established remote cursors, and can clean them up by issuing // killCursors commands. responses.clear(); std::vector batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(123), batch2); std::vector batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")}; responses.emplace_back(_nss, CursorId(0), batch2); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); // Only one of the responses has a non-zero cursor id. The ARM should have issued a killCursors // command against this id. BSONObj expectedCmdObj = BSON("killCursors" << "testcoll" << "cursors" << BSON_ARRAY(CursorId(123))); ASSERT_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj); // Ensure that we properly signal both those waiting for the kill, and those waiting for more // results to be ready. executor->waitForEvent(readyEvent); executor->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, KillOutstandingGetMore) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(123), batch1); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); // First batch received. ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); // This will schedule a getMore on cursor id 123. ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); auto killedEvent = arm->kill(); // The kill can't complete until the getMore response is received. responses.clear(); std::vector batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(123), batch2); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); // While processing the getMore response, a killCursors against id 123 should have been // scheduled. BSONObj expectedCmdObj = BSON("killCursors" << "testcoll" << "cursors" << BSON_ARRAY(CursorId(123))); ASSERT_EQ(getFirstPendingRequest().cmdObj, expectedCmdObj); // Ensure that we properly signal both those waiting for the kill, and those waiting for more // results to be ready. executor->waitForEvent(readyEvent); executor->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); auto killedEvent = arm->kill(); // Attempting to schedule more network operations on a killed arm is an error. ASSERT_NOT_OK(arm->nextEvent().getStatus()); executor->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, KillCalledTwice) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); auto killedEvent1 = arm->kill(); ASSERT(killedEvent1.isValid()); auto killedEvent2 = arm->kill(); ASSERT(killedEvent2.isValid()); executor->waitForEvent(killedEvent1); executor->waitForEvent(killedEvent2); } TEST_F(AsyncResultsMergerTest, TailableBasic) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(123), batch1); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); // In the tailable case, we expect boost::none after every batch. ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); ASSERT_FALSE(arm->ready()); readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); responses.clear(); std::vector batch2 = {fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(123), batch2); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); auto killedEvent = arm->kill(); executor->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); // Remote responds with an empty batch and a non-zero cursor id. std::vector responses; std::vector batch; responses.emplace_back(_nss, CursorId(123), batch); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); // After receiving an empty batch, the ARM should return boost::none. ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); auto killedEvent = arm->kill(); executor->waitForEvent(killedEvent); } TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 3}"); makeCursorFromFindCmd(findCmd, {_remotes[0]}); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); std::vector responses; std::vector batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 2}"), *unittest::assertGet(arm->nextReady())); ASSERT_FALSE(arm->ready()); responses.clear(); std::vector batch2 = {fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(0), batch2); readyEvent = unittest::assertGet(arm->nextEvent()); BSONObj scheduledCmd = getFirstPendingRequest().cmdObj; auto request = GetMoreRequest::parseFromBSON("anydbname", scheduledCmd); ASSERT_OK(request.getStatus()); ASSERT_EQ(*request.getValue().batchSize, 1LL); ASSERT_EQ(request.getValue().cursorid, 1LL); scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 3}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); } TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { BSONObj findCmd = fromjson("{find: 'testcoll', batchSize: 2}"); const bool isSecondaryOk = true; makeCursorFromFindCmd(findCmd, {_remotes[0]}, boost::none, isSecondaryOk); ASSERT_FALSE(arm->ready()); auto readyEvent = unittest::assertGet(arm->nextEvent()); ASSERT_FALSE(arm->ready()); BSONObj cmdRequestMetadata = getFirstPendingRequest().metadata; ASSERT_EQ(cmdRequestMetadata, BSON("$secondaryOk" << 1)); std::vector responses; std::vector batch1 = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(0), batch1); scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady())); ASSERT_TRUE(arm->ready()); ASSERT(!unittest::assertGet(arm->nextReady())); } } // namespace } // namespace mongo