summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/async_results_merger_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/query/async_results_merger_test.cpp')
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp280
1 files changed, 2 insertions, 278 deletions
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 7960d22f018..b852cf33f79 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -30,18 +30,13 @@
#include "mongo/s/query/async_results_merger.h"
-#include "mongo/client/remote_command_targeter_factory_mock.h"
-#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/json.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/query_request.h"
-#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
-#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/sharding_router_test_fixture.h"
+#include "mongo/s/query/results_merger_test_fixture.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
@@ -50,212 +45,11 @@ namespace mongo {
namespace {
-using executor::NetworkInterfaceMock;
-using executor::RemoteCommandRequest;
-using executor::RemoteCommandResponse;
-
-using ResponseStatus = executor::TaskExecutor::ResponseStatus;
-
-const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
-const std::vector<ShardId> kTestShardIds = {
- ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")};
-const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345),
- HostAndPort("FakeShard2Host", 12345),
- HostAndPort("FakeShard3Host", 12345)};
-
-const NamespaceString kTestNss("testdb.testcoll");
-
LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) {
return LogicalSessionId::parse(IDLParserErrorContext("lsid"), cmdObj["lsid"].Obj());
}
-class AsyncResultsMergerTest : public ShardingTestFixture {
-public:
- AsyncResultsMergerTest() {}
-
- void setUp() override {
- setRemote(HostAndPort("ClientHost", 12345));
-
- configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
-
- std::vector<ShardType> shards;
-
- for (size_t i = 0; i < kTestShardIds.size(); i++) {
- ShardType shardType;
- shardType.setName(kTestShardIds[i].toString());
- shardType.setHost(kTestShardHosts[i].toString());
-
- shards.push_back(shardType);
-
- std::unique_ptr<RemoteCommandTargeterMock> targeter(
- stdx::make_unique<RemoteCommandTargeterMock>());
- targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i]));
- targeter->setFindHostReturnValue(kTestShardHosts[i]);
-
- targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]),
- std::move(targeter));
- }
-
- setupShards(shards);
- }
-
-protected:
- /**
- * Constructs an ARM with the given vector of existing cursors.
- *
- * If 'findCmd' is not set, the default AsyncResultsMergerParams are used.
- * Otherwise, the 'findCmd' is used to construct the AsyncResultsMergerParams.
- *
- * 'findCmd' should not have a 'batchSize', since the find's batchSize is used just in the
- * initial find. The getMore 'batchSize' can be passed in through 'getMoreBatchSize.'
- */
- std::unique_ptr<AsyncResultsMerger> makeARMFromExistingCursors(
- std::vector<RemoteCursor> remoteCursors,
- boost::optional<BSONObj> findCmd = boost::none,
- boost::optional<std::int64_t> getMoreBatchSize = boost::none) {
- AsyncResultsMergerParams params;
- params.setNss(kTestNss);
- params.setRemotes(std::move(remoteCursors));
-
-
- if (findCmd) {
- const auto qr = unittest::assertGet(
- QueryRequest::makeFromFindCommand(kTestNss, *findCmd, false /* isExplain */));
- if (!qr->getSort().isEmpty()) {
- params.setSort(qr->getSort().getOwned());
- }
-
- if (getMoreBatchSize) {
- params.setBatchSize(getMoreBatchSize);
- } else {
- params.setBatchSize(qr->getBatchSize()
- ? boost::optional<std::int64_t>(
- static_cast<std::int64_t>(*qr->getBatchSize()))
- : boost::none);
- }
- params.setTailableMode(qr->getTailableMode());
- params.setAllowPartialResults(qr->isAllowPartialResults());
- }
-
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(operationContext()->getLogicalSessionId());
- sessionInfo.setTxnNumber(operationContext()->getTxnNumber());
- params.setOperationSessionInfo(sessionInfo);
-
- return stdx::make_unique<AsyncResultsMerger>(
- operationContext(), executor(), std::move(params));
- }
-
- /**
- * Schedules a "CommandOnShardedViewNotSupportedOnMongod" error response w/ view definition.
- */
- void scheduleNetworkViewResponse(const std::string& ns, const std::string& pipelineJsonArr) {
- BSONObjBuilder viewDefBob;
- viewDefBob.append("ns", ns);
- viewDefBob.append("pipeline", fromjson(pipelineJsonArr));
-
- BSONObjBuilder bob;
- bob.append("resolvedView", viewDefBob.obj());
- bob.append("ok", 0.0);
- bob.append("errmsg", "Command on view must be executed by mongos");
- bob.append("code", 169);
-
- std::vector<BSONObj> batch = {bob.obj()};
- scheduleNetworkResponseObjs(batch);
- }
-
- /**
- * Schedules a list of cursor responses to be returned by the mock network.
- */
- void scheduleNetworkResponses(std::vector<CursorResponse> responses) {
- std::vector<BSONObj> objs;
- for (const auto& cursorResponse : responses) {
- // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are
- // subsequent responses, since the AsyncResultsMerger will only ever run getMores.
- objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse));
- }
- scheduleNetworkResponseObjs(objs);
- }
-
- /**
- * Schedules a list of raw BSON command responses to be returned by the mock network.
- */
- void scheduleNetworkResponseObjs(std::vector<BSONObj> objs) {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- for (const auto& obj : objs) {
- ASSERT_TRUE(net->hasReadyRequests());
- Milliseconds millis(0);
- RemoteCommandResponse response(obj, millis);
- executor::TaskExecutor::ResponseStatus responseStatus(response);
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus);
- }
- net->runReadyNetworkOperations();
- net->exitNetwork();
- }
-
- RemoteCommandRequest getNthPendingRequest(size_t n) {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- NetworkInterfaceMock::NetworkOperationIterator noi = net->getNthUnscheduledRequest(n);
- RemoteCommandRequest retRequest = noi->getRequest();
- net->exitNetwork();
- return retRequest;
- }
-
- bool networkHasReadyRequests() {
- NetworkInterfaceMock::InNetworkGuard guard(network());
- return guard->hasReadyRequests();
- }
-
- void scheduleErrorResponse(ResponseStatus rs) {
- invariant(!rs.isOK());
- rs.elapsedMillis = Milliseconds(0);
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- net->scheduleResponse(net->getNextReadyRequest(), net->now(), rs);
- net->runReadyNetworkOperations();
- net->exitNetwork();
- }
-
- void runReadyCallbacks() {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- net->runReadyNetworkOperations();
- net->exitNetwork();
- }
-
- void blackHoleNextRequest() {
- executor::NetworkInterfaceMock* net = network();
- net->enterNetwork();
- ASSERT_TRUE(net->hasReadyRequests());
- net->blackHole(net->getNextReadyRequest());
- net->exitNetwork();
- }
-};
-
-void assertKillCusorsCmdHasCursorId(const BSONObj& killCmd, CursorId cursorId) {
- ASSERT_TRUE(killCmd.hasElement("killCursors"));
- ASSERT_EQ(killCmd["cursors"].type(), BSONType::Array);
-
- size_t numCursors = 0;
- for (auto&& cursor : killCmd["cursors"].Obj()) {
- ASSERT_EQ(cursor.type(), BSONType::NumberLong);
- ASSERT_EQ(cursor.numberLong(), cursorId);
- ++numCursors;
- }
- ASSERT_EQ(numCursors, 1u);
-}
-
-RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
- RemoteCursor remoteCursor;
- remoteCursor.setShardId(std::move(shardId));
- remoteCursor.setHostAndPort(std::move(host));
- remoteCursor.setCursorResponse(std::move(response));
- return remoteCursor;
-}
+using AsyncResultsMergerTest = ResultsMergerTestFixture;
TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) {
std::vector<RemoteCursor> cursors;
@@ -1888,76 +1682,6 @@ TEST_F(AsyncResultsMergerTest, KillShouldNotWaitForRemoteCommandsBeforeSchedulin
executor()->waitForEvent(killEvent);
}
-TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilNextResultIsReady) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
-
- // Before any requests are scheduled, ARM is not ready to return results.
- ASSERT_FALSE(arm->ready());
- ASSERT_FALSE(arm->remotesExhausted());
-
- // Issue a blocking wait for the next result asynchronously on a different thread.
- auto future = launchAsync([&]() {
- auto next = unittest::assertGet(arm->blockingNext());
- ASSERT_FALSE(next.isEOF());
- ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1));
- next = unittest::assertGet(arm->blockingNext());
- ASSERT_TRUE(next.isEOF());
- });
-
- // Schedule the response to the getMore which will return the next result and mark the cursor as
- // exhausted.
- onCommand([&](const auto& request) {
- ASSERT(request.cmdObj["getMore"]);
- return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
- .toBSON(CursorResponse::ResponseType::SubsequentResponse);
- });
-
- future.timed_get(kFutureTimeout);
-}
-
-TEST_F(AsyncResultsMergerTest, ShouldBeInterruptableDuringBlockingNext) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
-
- // Issue a blocking wait for the next result asynchronously on a different thread.
- auto future = launchAsync([&]() {
- auto nextStatus = arm->blockingNext();
- ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted);
- });
-
- // Now mark the OperationContext as killed from this thread.
- {
- stdx::lock_guard<Client> lk(*operationContext()->getClient());
- operationContext()->markKilled(ErrorCodes::Interrupted);
- }
- future.timed_get(kFutureTimeout);
- // Be careful not to use a blocking kill here, since the main thread is in charge of running the
- // callbacks, and we'd block on ourselves.
- auto killEvent = arm->kill(operationContext());
-
- assertKillCusorsCmdHasCursorId(getNthPendingRequest(0u).cmdObj, 1);
- runReadyCallbacks();
- executor()->waitForEvent(killEvent);
-}
-
-TEST_F(AsyncResultsMergerTest, ShouldBeAbleToBlockUntilKilled) {
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- auto arm = makeARMFromExistingCursors(std::move(cursors));
-
- // Before any requests are scheduled, ARM is not ready to return results.
- ASSERT_FALSE(arm->ready());
- ASSERT_FALSE(arm->remotesExhausted());
-
- arm->blockingKill(operationContext());
-}
-
TEST_F(AsyncResultsMergerTest, GetMoresShouldNotIncludeLSIDOrTxnNumberIfNoneSpecified) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(