summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2019-01-04 18:20:17 -0500
committerIan Boros <ian.boros@10gen.com>2019-01-11 16:14:17 -0500
commit424621d33255e28cf5f4988935a0d175f6e9fd78 (patch)
tree71c5c5c14b0185c5a688511b3e1b67a517e63904 /src/mongo
parent73bce03305b2aa8c94d2400ffdb848ccb0f27588 (diff)
downloadmongo-424621d33255e28cf5f4988935a0d175f6e9fd78.tar.gz
SERVER-38858 fix race
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/s/query/blocking_results_merger_test.cpp58
-rw-r--r--src/mongo/s/query/results_merger_test_fixture.h8
2 files changed, 56 insertions, 10 deletions
diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp
index 2f76d576e3d..ed2b33dec5b 100644
--- a/src/mongo/s/query/blocking_results_merger_test.cpp
+++ b/src/mongo/s/query/blocking_results_merger_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/query/find_common.h"
#include "mongo/s/query/blocking_results_merger.h"
#include "mongo/s/query/results_merger_test_fixture.h"
#include "mongo/unittest/unittest.h"
@@ -53,6 +54,11 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilKilled) {
}
TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilDeadlineExpires) {
+ // Set the deadline to be two seconds in the future. We always test that the deadline
+ // expires, so there's no racing.
+ awaitDataState(operationContext()).waitForInsertsDeadline =
+ getMockClockSource()->now() + Milliseconds{2000};
+
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
@@ -63,14 +69,30 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilDeadlineExpires) {
// Issue a blocking wait for the next result asynchronously on a different thread.
auto future = launchAsync([&]() {
- auto next = unittest::assertGet(
- blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind));
+ // Pass kGetMoreNoResultsYet so that the BRM will block and not just
+ // return an empty batch immediately.
+ auto next = unittest::assertGet(blockingMerger.next(
+ operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet));
- // The timeout should hit, and return an empty object.
+ // The timeout should hit, and return EOF.
ASSERT_TRUE(next.isEOF());
});
+ // Wait for a bit. Hopefully the other thread will be waiting for the clock to advance.
+ // If not, we just advance the clock now, and when the other thread gets to that point
+ // it will see that "now" has passed the deadline.
+ sleepsecs(1);
+
+ getMockClockSource()->advance(Milliseconds{3000});
+
future.timed_get(kFutureTimeout);
+
+ // Answer the getMore, so that there are no more outstanding requests.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSONObj()})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
blockingMerger.kill(operationContext());
}
@@ -106,6 +128,12 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) {
}
TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReadyWithDeadline) {
+ // Set the deadline to be two seconds in the future. We always test that the deadline
+ // expires, so there's no racing.
+ awaitDataState(operationContext()).waitForInsertsDeadline =
+ operationContext()->getServiceContext()->getPreciseClockSource()->now() +
+ Milliseconds{2000};
+
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
@@ -114,21 +142,31 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReadyWithDe
BlockingResultsMerger blockingMerger(
operationContext(), std::move(params), executor(), nullptr);
- // Used for synchronizing the background thread with this thread.
- stdx::mutex mutex;
- stdx::unique_lock<stdx::mutex> lk(mutex);
-
- // Issue a blocking wait for the next result asynchronously on a different thread.
+ // Will schedule a getMore. No one will send a response in time, so will return EOF.
auto future = launchAsync([&]() {
- // Will schedule a getMore. No one will send a response, so will return EOF.
auto next = unittest::assertGet(blockingMerger.next(
operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet));
ASSERT_TRUE(next.isEOF());
+ });
+
+ // Wait for a bit. Hopefully the other thread will be waiting for the clock to advance.
+ // If not, we just advance the clock now, and when the other thread gets to that point
+ // it will see that "now" has passed the deadline.
+ sleepsecs(1);
+ getMockClockSource()->advance(Milliseconds{3000});
+
+ future.timed_get(kFutureTimeout);
+ // Used for synchronizing the background thread with this thread.
+ stdx::mutex mutex;
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ future = launchAsync([&]() {
// Block until the main thread has responded to the getMore.
stdx::unique_lock<stdx::mutex> lk(mutex);
- next = unittest::assertGet(blockingMerger.next(
+ auto next = unittest::assertGet(blockingMerger.next(
operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet));
ASSERT_FALSE(next.isEOF());
ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1));
diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h
index 7b714c568ae..7a0f46c9058 100644
--- a/src/mongo/s/query/results_merger_test_fixture.h
+++ b/src/mongo/s/query/results_merger_test_fixture.h
@@ -32,6 +32,7 @@
#include "mongo/s/query/async_results_merger.h"
#include "mongo/s/sharding_router_test_fixture.h"
+#include "mongo/util/clock_source_mock.h"
namespace mongo {
@@ -241,6 +242,13 @@ protected:
remoteCursor.setCursorResponse(std::move(response));
return remoteCursor;
}
+
+ ClockSourceMock* getMockClockSource() {
+ ClockSourceMock* mockClock = dynamic_cast<ClockSourceMock*>(
+ operationContext()->getServiceContext()->getPreciseClockSource());
+ invariant(mockClock);
+ return mockClock;
+ }
};
} // namespace mongo