summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/blocking_results_merger_test.cpp
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2018-11-28 18:38:15 -0500
committerIan Boros <ian.boros@10gen.com>2018-12-17 16:36:20 -0500
commitd40d24abc025690150ccf8009ba1facb9ed1c6b2 (patch)
tree8ba35e0d4f8c7e441ffe80de30faa24a708c436e /src/mongo/s/query/blocking_results_merger_test.cpp
parentb37b5ef7ec0ec2e502423d53e6c0d6e86b343c27 (diff)
downloadmongo-d40d24abc025690150ccf8009ba1facb9ed1c6b2.tar.gz
SERVER-33683 Prevent deadlock in aggregate with transactions
Diffstat (limited to 'src/mongo/s/query/blocking_results_merger_test.cpp')
-rw-r--r--src/mongo/s/query/blocking_results_merger_test.cpp156
1 files changed, 152 insertions, 4 deletions
diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp
index 961b3b5912c..2f76d576e3d 100644
--- a/src/mongo/s/query/blocking_results_merger_test.cpp
+++ b/src/mongo/s/query/blocking_results_merger_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/s/query/blocking_results_merger.h"
#include "mongo/s/query/results_merger_test_fixture.h"
+#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -43,9 +44,33 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilKilled) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ nullptr);
+
+ blockingMerger.kill(operationContext());
+}
+
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilDeadlineExpires) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto params = makeARMParamsFromExistingCursors(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
BlockingResultsMerger blockingMerger(
- operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor());
+ operationContext(), std::move(params), executor(), nullptr);
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto next = unittest::assertGet(
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind));
+
+ // The timeout should hit, and return an empty object.
+ ASSERT_TRUE(next.isEOF());
+ });
+
+ future.timed_get(kFutureTimeout);
blockingMerger.kill(operationContext());
}
@@ -53,8 +78,10 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- BlockingResultsMerger blockingMerger(
- operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor());
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ nullptr);
// Issue a blocking wait for the next result asynchronously on a different thread.
auto future = launchAsync([&]() {
@@ -78,12 +105,57 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) {
future.timed_get(kFutureTimeout);
}
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReadyWithDeadline) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto params = makeARMParamsFromExistingCursors(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ BlockingResultsMerger blockingMerger(
+ operationContext(), std::move(params), executor(), nullptr);
+
+ // Used for synchronizing the background thread with this thread.
+ stdx::mutex mutex;
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ // Will schedule a getMore. No one will send a response, so will return EOF.
+ auto next = unittest::assertGet(blockingMerger.next(
+ operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet));
+ ASSERT_TRUE(next.isEOF());
+
+ // Block until the main thread has responded to the getMore.
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ next = unittest::assertGet(blockingMerger.next(
+ operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet));
+ ASSERT_FALSE(next.isEOF());
+ ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1));
+
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ // Unblock the other thread, allowing it to call next() on the BlockingResultsMerger.
+ lk.unlock();
+
+ future.timed_get(kFutureTimeout);
+}
+
TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
auto params = makeARMParamsFromExistingCursors(std::move(cursors));
- BlockingResultsMerger blockingMerger(operationContext(), std::move(params), executor());
+ BlockingResultsMerger blockingMerger(
+ operationContext(), std::move(params), executor(), nullptr);
// Issue a blocking wait for the next result asynchronously on a different thread.
auto future = launchAsync([&]() {
@@ -116,5 +188,81 @@ TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) {
future.timed_get(kFutureTimeout);
}
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenYielding) {
+ class ThrowyResourceYielder : public ResourceYielder {
+ public:
+ void yield(OperationContext*) {
+ uasserted(ErrorCodes::BadValue, "Simulated error");
+ }
+
+ void unyield(OperationContext*) {}
+ };
+
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ std::make_unique<ThrowyResourceYielder>());
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ // Make sure that the next() call throws correctly.
+ const auto status =
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)
+ .getStatus();
+ ASSERT_EQ(status, ErrorCodes::BadValue);
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenUnyielding) {
+ class ThrowyResourceYielder : public ResourceYielder {
+ public:
+ void yield(OperationContext*) {}
+
+ void unyield(OperationContext*) {
+ uasserted(ErrorCodes::BadValue, "Simulated error");
+ }
+ };
+
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ std::make_unique<ThrowyResourceYielder>());
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ // Make sure that the next() call throws correctly.
+ const auto status =
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)
+ .getStatus();
+ ASSERT_EQ(status, ErrorCodes::BadValue);
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
} // namespace
} // namespace mongo