summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Tarzia <steve.tarzia@mongodb.com>2022-11-16 15:41:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-16 17:28:42 +0000
commit29296c1ed9d000f761d908e1c32363acb0802568 (patch)
tree9df2989f545f7726fd2713446bca5eab8324b9ea
parent931838789d56be706032fa6d3b6062aa38c7ee83 (diff)
downloadmongo-29296c1ed9d000f761d908e1c32363acb0802568.tar.gz
SERVER-71372 fix getMore of partial results from find on sharded cluster
-rw-r--r--jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js177
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp3
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl_test.cpp51
3 files changed, 150 insertions, 81 deletions
diff --git a/jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js b/jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js
index fdf85caea0a..e2892b96be2 100644
--- a/jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js
+++ b/jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js
@@ -1,7 +1,7 @@
/**
* SERVER-57469: Test that the 'allowPartialResults' option to find is respected when used together
* with 'maxTimeMS' and only a subset of the shards provide data before the timeout.
- * Uses both failpoints and MongoBridge to simulate MaxTimeMSExpired.
+ * Uses three methods to simulate MaxTimeMSExpired: failpoints, MongoBridge, and $where + sleep.
*
* @tags: [
* requires_sharding,
@@ -41,7 +41,7 @@ const coll = st.s0.getDB(dbName)[collName];
function initDb(numSamples, splitPoint) {
coll.drop();
- // Use ranged sharding with 50% of the data on the second shard.
+ // Use ranged sharding with a specified fraction of the data on the second shard.
st.shardColl(
coll,
{_id: 1}, // shard key
@@ -57,27 +57,30 @@ function initDb(numSamples, splitPoint) {
}
// Insert some data.
-const size = 1000;
-const splitPoint = Math.max(1, size / 2);
-initDb(size, splitPoint);
+const nDocs = 200;
+const splitPoint = Math.max(1, nDocs / 2);
+initDb(nDocs, splitPoint);
// We will sometimes use $where expressions to inject delays in processing documents on some shards.
// Maps from shard to a snippet of JS code. This is modified by FindWhereSleepController
let whereExpressions = {};
+function whereCode() {
+ return Object.values(whereExpressions).join("") + "return 1;";
+}
function runQueryWithTimeout(doAllowPartialResults, timeout) {
return coll.runCommand({
find: collName,
- filter: {$where: Object.values(whereExpressions).join("") + "return 1;"},
+ filter: {$where: whereCode()},
allowPartialResults: doAllowPartialResults,
- batchSize: size,
+ batchSize: nDocs,
maxTimeMS: timeout
});
}
-// Set ampleTimeMS to at least two seconds, plus ten times the basic query runtime.
+// Set ampleTimeMS to at least 100ms, plus ten times the basic query runtime.
// This timeout will provide ample time for our queries to run to completion.
-const ampleTimeMS = 2000 + 10 * runtimeMillis(() => runQueryWithTimeout(true, 999999999));
+const ampleTimeMS = 100 + 10 * runtimeMillis(() => runQueryWithTimeout(true, 999999999));
print("ampleTimeMS: " + ampleTimeMS);
// Try to fetch all the data in one batch, with ample time allowed.
@@ -96,16 +99,14 @@ function runQuery(doAllowPartialResults) {
fpMongos.off();
}
-const batchSizeForGetMore = 10;
-
// Simulate mongos timeout during getMore.
-function getMoreMongosTimeout(allowPartialResults) {
+function getMoreMongosTimeout(allowPartialResults, batchSize) {
// Get the first batch.
const res = assert.commandWorked(coll.runCommand({
find: collName,
- filter: {$where: Object.values(whereExpressions).join("") + "return 1;"},
+ filter: {$where: whereCode()},
allowPartialResults: allowPartialResults,
- batchSize: batchSizeForGetMore,
+ batchSize: batchSize,
maxTimeMS: ampleTimeMS
}));
assert(!res.cursor.hasOwnProperty("partialResultsReturned"));
@@ -115,10 +116,10 @@ function getMoreMongosTimeout(allowPartialResults) {
// Run getmores repeatedly until we exhaust the cache on mongos.
// Eventually we should get either a MaxTimeMS error or partial results because a shard is down.
- let numReturned = batchSizeForGetMore; // One batch was returned so far.
+ let numReturned = batchSize; // One batch was returned so far.
while (true) {
- const res2 = coll.runCommand(
- {getMore: res.cursor.id, collection: collName, batchSize: batchSizeForGetMore});
+ const res2 =
+ coll.runCommand({getMore: res.cursor.id, collection: collName, batchSize: batchSize});
if (isError(res2)) {
assert.commandFailedWithCode(
res2, ErrorCodes.MaxTimeMSExpired, "failure should be due to MaxTimeMSExpired");
@@ -128,20 +129,45 @@ function getMoreMongosTimeout(allowPartialResults) {
// are returned even if MaxTimeMS expired on mongos.
numReturned += res2.cursor.nextBatch.length;
print(numReturned + " docs returned so far");
- assert.neq(numReturned, size, "Got full results even through mongos had MaxTimeMSExpired.");
+ assert.neq(
+ numReturned, nDocs, "Got full results even through mongos had MaxTimeMSExpired.");
if (res2.cursor.partialResultsReturned) {
assert(allowPartialResults);
- assert.lt(numReturned, size);
+ assert.lt(numReturned, nDocs);
break;
}
}
fpMongos.off();
}
-getMoreMongosTimeout(true);
-getMoreMongosTimeout(false);
+// Run the getMore tests with two batch sizes.
+// In the first case, we have (splitPoint % batchSizeForGetMore == 0) and the getMores will likely
+// exhaust the live shard without requiring any data from the dead shard. When the getMore to
+// the dead shard times out, it will be the only unexhausted remote.
+// In the second case, choosing (splitPoint % batchSizeForGetMore != 0) the final getMore will be
+// requesting data from both shards. Data from the live shard should be returned despite the dead
+// shard timing out.
+const batchSizesForGetMore = [50, 47];
+assert.eq(splitPoint % batchSizesForGetMore[0], 0);
+assert.neq(splitPoint % batchSizesForGetMore[1], 0);
+assert.lt(batchSizesForGetMore[0], splitPoint);
+assert.lt(batchSizesForGetMore[1], splitPoint);
+
+function withEachBatchSize(callback) {
+ callback(batchSizesForGetMore[0]);
+ callback(batchSizesForGetMore[1]);
+}
+
+function withEachValueOfAllowPartialResults(callback) {
+ callback(true);
+ callback(false);
+}
+
+withEachValueOfAllowPartialResults(
+ allowPartialResults =>
+ withEachBatchSize(batchSize => getMoreMongosTimeout(allowPartialResults, batchSize)));
-// Test shard timeouts. These are the scenario that we expect to be possible in practice.
-// Test using both failpoints and mongo bridge, testing slightly different execution paths.
+// Test shard timeouts. These are the scenario that we expect to be likely in practice.
+// Test using 3 different types of simulated timeouts, giving slightly different execution paths.
class MaxTimeMSFailpointFailureController {
constructor(mongoInstance) {
@@ -161,6 +187,7 @@ class MaxTimeMSFailpointFailureController {
class NetworkFailureController {
constructor(shard) {
this.shard = shard;
+ this.delayTime = Math.round(1.1 * ampleTimeMS);
}
enable() {
@@ -169,12 +196,13 @@ class NetworkFailureController {
// lost. We delay instead of dropping messages because this lets the shard request proceed
// without connection failure and retry (which has its own driver-controlled timeout,
// typically 15s).
- this.shard.getPrimary().delayMessagesFrom(st.s, 2 * ampleTimeMS);
+ this.shard.getPrimary().delayMessagesFrom(st.s, this.delayTime);
}
disable() {
this.shard.getPrimary().delayMessagesFrom(st.s, 0);
- sleep(2 * ampleTimeMS); // Allow time for delayed messages to be flushed.
+ // Allow time for delayed messages to be flushed so that the next request is not delayed.
+ sleep(this.delayTime);
}
}
@@ -205,10 +233,10 @@ class FindWhereSleepController {
// Add a $where expression to find command that sleeps when processing a document on the
// shard of interest.
let slowDocId = (this.shard == st.shard0) ? 0 : splitPoint;
- // Offset the slowDocId by batchSizeForGetMore so that when testing getMore, we quickly
- // return enough documents to serve the first batch without timing out.
- slowDocId += batchSizeForGetMore;
- const sleepTimeMS = 2 * ampleTimeMS;
+ // Offset the slowDocId by at least the getMore batch size so that when testing getMore,
+ // we quickly return enough documents to serve the first batch without timing out.
+ slowDocId += Math.max(...batchSizesForGetMore);
+ const sleepTimeMS = Math.round(1.1 * ampleTimeMS);
whereExpressions[this.shard] = `if (this._id == ${slowDocId}) {sleep(${sleepTimeMS})};`;
}
@@ -232,26 +260,49 @@ const allShardsSleepFailure = new MultiFailureController([shard0SleepFailure, sh
const allshardsMixedFailures = new MultiFailureController([shard0NetworkFailure, shard1Failpoint]);
-function getMoreShardTimeout(allowPartialResults, failureController) {
+// Due to the hack with sleepFailures below, this has to be the innermost parameterizing function.
+function withEachSingleShardFailure(callback) {
+ callback(shard0Failpoint);
+ callback(shard1Failpoint);
+ callback(shard0NetworkFailure);
+ callback(shard1NetworkFailure);
+ // The FindWhereSleepFailureController must be set before the first "find" because that's when
+ // the $where clause is set.
+ shard0SleepFailure.enable();
+ callback(shard0SleepFailure);
+ shard0SleepFailure.disable();
+ shard1SleepFailure.enable();
+ callback(shard1SleepFailure);
+ shard1NetworkFailure.disable();
+}
+
+function withEachAllShardFailure(callback) {
+ callback(allShardsFailpoint);
+ callback(allshardsNetworkFailure);
+ callback(allShardsSleepFailure);
+ callback(allshardsMixedFailures);
+}
+
+function getMoreShardTimeout(allowPartialResults, failureController, batchSize) {
// Get the first batch.
const res = assert.commandWorked(coll.runCommand({
find: collName,
- filter: {$where: Object.values(whereExpressions).join("") + "return 1;"},
+ filter: {$where: whereCode()},
allowPartialResults: allowPartialResults,
- batchSize: batchSizeForGetMore,
+ batchSize: batchSize,
maxTimeMS: ampleTimeMS
}));
assert.eq(undefined, res.cursor.partialResultsReturned);
assert.gt(res.cursor.id, 0);
// Stop a shard and run getMore.
failureController.enable();
- let numReturned = batchSizeForGetMore; // One batch was returned so far.
+ let numReturned = batchSize; // One batch was returned so far.
print(numReturned + " docs returned in the first batch");
while (true) {
// Run getmores repeatedly until we exhaust the cache on mongos.
// Eventually we should get partial results or an error because a shard is down.
- const res2 = coll.runCommand(
- {getMore: res.cursor.id, collection: collName, batchSize: batchSizeForGetMore});
+ const res2 =
+ coll.runCommand({getMore: res.cursor.id, collection: collName, batchSize: batchSize});
if (allowPartialResults) {
assert.commandWorked(res2);
} else {
@@ -263,10 +314,10 @@ function getMoreShardTimeout(allowPartialResults, failureController) {
}
numReturned += res2.cursor.nextBatch.length;
print(numReturned + " docs returned so far");
- assert.neq(numReturned, size, "Entire collection seemed to be cached by the first find!");
+ assert.neq(numReturned, nDocs, "Entire collection seemed to be cached by the first find!");
if (res2.cursor.partialResultsReturned) {
if (allowPartialResults) {
- assert.lt(numReturned, size);
+ assert.lt(numReturned, nDocs);
break;
} else {
assert(false, "Partial results should not have been allowed.");
@@ -275,29 +326,11 @@ function getMoreShardTimeout(allowPartialResults, failureController) {
}
failureController.disable();
}
-// getMore timeout with allowPartialResults=true.
-getMoreShardTimeout(true, shard0Failpoint);
-getMoreShardTimeout(true, shard1Failpoint);
-getMoreShardTimeout(true, shard0NetworkFailure);
-getMoreShardTimeout(true, shard1NetworkFailure);
-// The FindWhereSleepFailureController must be set before the first "find" because that's when the
-// $where clause is set.
-shard0SleepFailure.enable();
-getMoreShardTimeout(true, shard0SleepFailure);
-shard1SleepFailure.enable();
-getMoreShardTimeout(true, shard1SleepFailure);
-
-// getMore timeout with allowPartialResults=false.
-getMoreShardTimeout(false, shard0Failpoint);
-getMoreShardTimeout(false, shard1Failpoint);
-getMoreShardTimeout(false, shard0NetworkFailure);
-getMoreShardTimeout(false, shard1NetworkFailure);
-// The FindWhereSleepFailureController must be set before the first "find" because that's when the
-// $where clause is set.
shard0SleepFailure.enable();
-getMoreShardTimeout(false, shard0SleepFailure);
-shard1SleepFailure.enable();
-getMoreShardTimeout(false, shard1SleepFailure);
+withEachValueOfAllowPartialResults(
+ allowPartialResults => withEachBatchSize(
+ batchSize => withEachSingleShardFailure(
+ failure => getMoreShardTimeout(allowPartialResults, failure, batchSize))));
// With 'allowPartialResults: true', if a shard times out on the first batch then return
// partial results.
@@ -305,16 +338,11 @@ function partialResultsTrueFirstBatch(failureController) {
failureController.enable();
const res = assert.commandWorked(runQuery(true));
assert(res.cursor.partialResultsReturned);
- assert.eq(res.cursor.firstBatch.length, size / 2);
+ assert.eq(res.cursor.firstBatch.length, nDocs / 2);
assert.eq(0, res.cursor.id);
failureController.disable();
}
-partialResultsTrueFirstBatch(shard0Failpoint);
-partialResultsTrueFirstBatch(shard1Failpoint);
-partialResultsTrueFirstBatch(shard0NetworkFailure);
-partialResultsTrueFirstBatch(shard1NetworkFailure);
-partialResultsTrueFirstBatch(shard0SleepFailure);
-partialResultsTrueFirstBatch(shard1SleepFailure);
+withEachSingleShardFailure(failure => partialResultsTrueFirstBatch(failure));
// With 'allowPartialResults: false', if one shard times out then return a timeout error.
function partialResultsFalseOneFailure(failureController) {
@@ -322,12 +350,7 @@ function partialResultsFalseOneFailure(failureController) {
assert.commandFailedWithCode(runQuery(false), ErrorCodes.MaxTimeMSExpired);
failureController.disable();
}
-partialResultsFalseOneFailure(shard0Failpoint);
-partialResultsFalseOneFailure(shard1Failpoint);
-partialResultsFalseOneFailure(shard0NetworkFailure);
-partialResultsFalseOneFailure(shard1NetworkFailure);
-partialResultsFalseOneFailure(shard0SleepFailure);
-partialResultsFalseOneFailure(shard1SleepFailure);
+withEachSingleShardFailure(failure => partialResultsFalseOneFailure(failure));
// With 'allowPartialResults: false', if both shards time out then return a timeout error.
function allowPartialResultsFalseAllFailed(failureController) {
@@ -335,10 +358,7 @@ function allowPartialResultsFalseAllFailed(failureController) {
assert.commandFailedWithCode(runQuery(false), ErrorCodes.MaxTimeMSExpired);
failureController.disable();
}
-allowPartialResultsFalseAllFailed(allShardsFailpoint);
-allowPartialResultsFalseAllFailed(allshardsNetworkFailure);
-allowPartialResultsFalseAllFailed(allshardsMixedFailures);
-allowPartialResultsFalseAllFailed(allShardsSleepFailure);
+withEachAllShardFailure(failure => allowPartialResultsFalseAllFailed(failure));
// With 'allowPartialResults: true', if both shards time out then return empty "partial" results.
function allowPartialResultsTrueAllFailed(failureController) {
@@ -349,10 +369,7 @@ function allowPartialResultsTrueAllFailed(failureController) {
assert.eq(res.cursor.firstBatch.length, 0);
failureController.disable();
}
-allowPartialResultsTrueAllFailed(allShardsFailpoint);
-allowPartialResultsTrueAllFailed(allshardsNetworkFailure);
-allowPartialResultsTrueAllFailed(allshardsMixedFailures);
-allowPartialResultsTrueAllFailed(allShardsSleepFailure);
+withEachAllShardFailure(failure => allowPartialResultsTrueAllFailed(failure));
st.stop();
}());
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index 444b0615d99..4f2a333c3be 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -99,7 +99,6 @@ ClusterClientCursorImpl::~ClusterClientCursorImpl() {
}
StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() {
-
invariant(_opCtx);
const auto interruptStatus = _opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
@@ -119,6 +118,8 @@ StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() {
if (next.isOK() && !next.getValue().isEOF()) {
++_numReturnedSoFar;
}
+ // Record if we just got a MaxTimeMSExpired error.
+ _maxTimeMSExpired |= (next.getStatus().code() == ErrorCodes::MaxTimeMSExpired);
return next;
}
diff --git a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
index 25c33abb38c..2581aa18116 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl_test.cpp
@@ -154,6 +154,57 @@ TEST_F(ClusterClientCursorImplTest, RemotesExhausted) {
ASSERT_EQ(cursor.getNumReturnedSoFar(), 2LL);
}
+TEST_F(ClusterClientCursorImplTest, RemoteTimeoutPartialResultsDisallowed) {
+ auto mockStage = std::make_unique<RouterStageMock>(_opCtx.get());
+ mockStage->queueResult(BSON("a" << 1));
+ mockStage->queueError(Status(ErrorCodes::MaxTimeMSExpired, "timeout"));
+ mockStage->markRemotesExhausted();
+
+ ClusterClientCursorImpl cursor(_opCtx.get(),
+ std::move(mockStage),
+ ClusterClientCursorParams(NamespaceString("unused"), {}),
+ boost::none);
+ ASSERT_TRUE(cursor.remotesExhausted());
+
+ auto firstResult = cursor.next();
+ ASSERT_OK(firstResult.getStatus());
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
+ ASSERT_TRUE(cursor.remotesExhausted());
+
+ auto thirdResult = cursor.next();
+ ASSERT_EQ(thirdResult.getStatus().code(), ErrorCodes::MaxTimeMSExpired);
+ ASSERT_TRUE(cursor.remotesExhausted());
+ ASSERT_FALSE(cursor.partialResultsReturned());
+ ASSERT_EQ(cursor.getNumReturnedSoFar(), 1LL);
+}
+
+TEST_F(ClusterClientCursorImplTest, RemoteTimeoutPartialResultsAllowed) {
+ auto mockStage = std::make_unique<RouterStageMock>(_opCtx.get());
+ mockStage->queueResult(BSON("a" << 1));
+ mockStage->queueError(Status(ErrorCodes::MaxTimeMSExpired, "timeout"));
+ mockStage->markRemotesExhausted();
+
+ auto params = ClusterClientCursorParams(NamespaceString("unused"), {});
+ params.isAllowPartialResults = true;
+
+ ClusterClientCursorImpl cursor(
+ _opCtx.get(), std::move(mockStage), std::move(params), boost::none);
+ ASSERT_TRUE(cursor.remotesExhausted());
+
+ auto firstResult = cursor.next();
+ ASSERT_OK(firstResult.getStatus());
+ ASSERT(firstResult.getValue().getResult());
+ ASSERT_BSONOBJ_EQ(*firstResult.getValue().getResult(), BSON("a" << 1));
+ ASSERT_TRUE(cursor.remotesExhausted());
+
+ auto thirdResult = cursor.next();
+ ASSERT_EQ(thirdResult.getStatus().code(), ErrorCodes::MaxTimeMSExpired);
+ ASSERT_TRUE(cursor.remotesExhausted());
+ ASSERT_TRUE(cursor.partialResultsReturned());
+ ASSERT_EQ(cursor.getNumReturnedSoFar(), 1LL);
+}
+
TEST_F(ClusterClientCursorImplTest, ForwardsAwaitDataTimeout) {
auto mockStage = std::make_unique<RouterStageMock>(_opCtx.get());
auto mockStagePtr = mockStage.get();