diff options
Diffstat (limited to 'src/mongo/s/query/blocking_results_merger_test.cpp')
-rw-r--r-- | src/mongo/s/query/blocking_results_merger_test.cpp | 156 |
1 files changed, 152 insertions, 4 deletions
diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp index 961b3b5912c..2f76d576e3d 100644 --- a/src/mongo/s/query/blocking_results_merger_test.cpp +++ b/src/mongo/s/query/blocking_results_merger_test.cpp @@ -32,6 +32,7 @@ #include "mongo/s/query/blocking_results_merger.h" #include "mongo/s/query/results_merger_test_fixture.h" +#include "mongo/unittest/unittest.h" namespace mongo { @@ -43,9 +44,33 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilKilled) { std::vector<RemoteCursor> cursors; cursors.emplace_back( makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + BlockingResultsMerger blockingMerger(operationContext(), + makeARMParamsFromExistingCursors(std::move(cursors)), + executor(), + nullptr); + + blockingMerger.kill(operationContext()); +} + +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilDeadlineExpires) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto params = makeARMParamsFromExistingCursors(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); BlockingResultsMerger blockingMerger( - operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor()); + operationContext(), std::move(params), executor(), nullptr); + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto next = unittest::assertGet( + blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)); + + // The timeout should hit, and return an empty object. + ASSERT_TRUE(next.isEOF()); + }); + + future.timed_get(kFutureTimeout); blockingMerger.kill(operationContext()); } @@ -53,8 +78,10 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) { std::vector<RemoteCursor> cursors; cursors.emplace_back( makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - BlockingResultsMerger blockingMerger( - operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor()); + BlockingResultsMerger blockingMerger(operationContext(), + makeARMParamsFromExistingCursors(std::move(cursors)), + executor(), + nullptr); // Issue a blocking wait for the next result asynchronously on a different thread. auto future = launchAsync([&]() { @@ -78,12 +105,57 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) { future.timed_get(kFutureTimeout); } +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReadyWithDeadline) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto params = makeARMParamsFromExistingCursors(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + BlockingResultsMerger blockingMerger( + operationContext(), std::move(params), executor(), nullptr); + + // Used for synchronizing the background thread with this thread. + stdx::mutex mutex; + stdx::unique_lock<stdx::mutex> lk(mutex); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + // Will schedule a getMore. No one will send a response, so will return EOF. + auto next = unittest::assertGet(blockingMerger.next( + operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet)); + ASSERT_TRUE(next.isEOF()); + + // Block until the main thread has responded to the getMore. + stdx::unique_lock<stdx::mutex> lk(mutex); + + next = unittest::assertGet(blockingMerger.next( + operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet)); + ASSERT_FALSE(next.isEOF()); + ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1)); + + }); + + // Schedule the response to the getMore which will return the next result and mark the cursor as + // exhausted. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + + // Unblock the other thread, allowing it to call next() on the BlockingResultsMerger. + lk.unlock(); + + future.timed_get(kFutureTimeout); +} + TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) { std::vector<RemoteCursor> cursors; cursors.emplace_back( makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); auto params = makeARMParamsFromExistingCursors(std::move(cursors)); - BlockingResultsMerger blockingMerger(operationContext(), std::move(params), executor()); + BlockingResultsMerger blockingMerger( + operationContext(), std::move(params), executor(), nullptr); // Issue a blocking wait for the next result asynchronously on a different thread. auto future = launchAsync([&]() { @@ -116,5 +188,81 @@ TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) { future.timed_get(kFutureTimeout); } +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenYielding) { + class ThrowyResourceYielder : public ResourceYielder { + public: + void yield(OperationContext*) { + uasserted(ErrorCodes::BadValue, "Simulated error"); + } + + void unyield(OperationContext*) {} + }; + + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + BlockingResultsMerger blockingMerger(operationContext(), + makeARMParamsFromExistingCursors(std::move(cursors)), + executor(), + std::make_unique<ThrowyResourceYielder>()); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + // Make sure that the next() call throws correctly. + const auto status = + blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind) + .getStatus(); + ASSERT_EQ(status, ErrorCodes::BadValue); + }); + + // Schedule the response to the getMore which will return the next result and mark the cursor as + // exhausted. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenUnyielding) { + class ThrowyResourceYielder : public ResourceYielder { + public: + void yield(OperationContext*) {} + + void unyield(OperationContext*) { + uasserted(ErrorCodes::BadValue, "Simulated error"); + } + }; + + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + BlockingResultsMerger blockingMerger(operationContext(), + makeARMParamsFromExistingCursors(std::move(cursors)), + executor(), + std::make_unique<ThrowyResourceYielder>()); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + // Make sure that the next() call throws correctly. + const auto status = + blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind) + .getStatus(); + ASSERT_EQ(status, ErrorCodes::BadValue); + }); + + // Schedule the response to the getMore which will return the next result and mark the cursor as + // exhausted. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + + future.timed_get(kFutureTimeout); +} + } // namespace } // namespace mongo |