summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Tarzia <steve.tarzia@mongodb.com>2022-10-26 23:05:13 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-27 00:12:58 +0000
commite3794c64dfa909a99655c34f5ca591ae56061377 (patch)
tree9c0b11030ef5b2512950910e3aa1cc1fb5b2d491
parent108528c1fdc795fa1f9571f2073668d4fc7f05c3 (diff)
downloadmongo-e3794c64dfa909a99655c34f5ca591ae56061377.tar.gz
SERVER-57469 return partial results when a shard times out
-rw-r--r--jstests/serial_run/allow_partial_results_with_maxTimeMS.js197
-rw-r--r--jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js286
-rw-r--r--src/mongo/s/query/async_results_merger.cpp13
-rw-r--r--src/mongo/s/query/async_results_merger.h5
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp158
-rw-r--r--src/mongo/s/query/blocking_results_merger_test.cpp31
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h3
-rw-r--r--src/mongo/s/query/cluster_find.cpp92
-rw-r--r--src/mongo/s/query/establish_cursors.cpp7
-rw-r--r--src/mongo/s/query/establish_cursors_test.cpp38
11 files changed, 817 insertions, 17 deletions
diff --git a/jstests/serial_run/allow_partial_results_with_maxTimeMS.js b/jstests/serial_run/allow_partial_results_with_maxTimeMS.js
new file mode 100644
index 00000000000..65a10a82f0e
--- /dev/null
+++ b/jstests/serial_run/allow_partial_results_with_maxTimeMS.js
@@ -0,0 +1,197 @@
+/**
+ * SERVER-57469: Test that the 'allowPartialResults' option to find is respected when used together
+ * with 'maxTimeMS' and only a subset of the shards provide data before the timeout.
+ * Does not rely on any failpoints, but tries to create a MaxTimeMSExpired scenario on an unaltered
+ * system. These tests are sensitive to timing; the execution resources (system performance) must
+ * be relativlitey consistent throughout the test.
+ *
+ * @tags: [
+ * requires_sharding,
+ * requires_replication,
+ * requires_getmore,
+ * requires_fcv_62,
+ * ]
+ */
+(function() {
+"use strict";
+
+function getMillis() {
+ const d = new Date();
+ return d.getTime();
+}
+function runtimeMillis(f) {
+ var start = getMillis();
+ f();
+ return (getMillis() - start);
+}
+function isError(res) {
+ return !res.hasOwnProperty('ok') || !res['ok'];
+}
+
+Random.setRandomSeed();
+
+const dbName = "test-SERVER-57469";
+const collName = "test-SERVER-57469-coll";
+
+// Set up a 2-shard single-node replicaset cluster.
+const st = new ShardingTest({name: jsTestName(), shards: 2, rs: {nodes: 1}});
+
+const coll = st.s0.getDB(dbName)[collName];
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+st.ensurePrimaryShard(dbName, st.shard0.name);
+
+// Insert some data.
+function initDb(numSamples) {
+ coll.drop();
+
+ // Use ranged sharding with 90% of the value range on the second shard.
+ const splitPoint = Math.max(1, numSamples / 10);
+ st.shardColl(
+ coll,
+ {_id: 1}, // shard key
+ {_id: splitPoint}, // split point
+ {_id: splitPoint + 1} // move the chunk to the other shard
+ );
+
+ let bulk = coll.initializeUnorderedBulkOp();
+ for (let i = 0; i < numSamples; i++) {
+ bulk.insert({"_id": i});
+ }
+ assert.commandWorked(bulk.execute());
+}
+
+let nDocs = 1000;
+initDb(nDocs);
+
+/**
+ * @param {Object} cmdRes coll.runCommand() result
+ * @param {int} expectedFullSize of results
+ * @returns {String} "error"|"partial"|"full"
+ */
+function interpretCommandResult(cmdRes, expectedFullSize) {
+ if (isError(cmdRes)) {
+ print(JSON.stringify(cmdRes));
+ assert.eq(ErrorCodes.MaxTimeMSExpired, cmdRes.code); // timeout
+ return "error";
+ }
+ let fetchedSize = (cmdRes.cursor.firstBatch !== undefined) ? cmdRes.cursor.firstBatch.length
+ : cmdRes.cursor.nextBatch.length;
+ if (cmdRes.cursor.partialResultsReturned) {
+ assert.lt(fetchedSize, expectedFullSize);
+ assert.eq(0, cmdRes.cursor.id); // Note: we always see cursor id == 0 with partial results.
+ return "partial";
+ }
+ assert.eq(fetchedSize, expectedFullSize);
+ assert.eq(undefined, cmdRes.cursor.partialResultsReturned);
+ return "full";
+}
+
+function runBigBatchQuery(timeoutMs) {
+ // The batchSize is equal to the full collection size.
+ return interpretCommandResult(
+ coll.runCommand(
+ {find: collName, maxTimeMS: timeoutMs, allowPartialResults: true, batchSize: nDocs}),
+ nDocs);
+}
+
+// Time the full query.
+
+// First, experimentally find an initial timeout value that is just on the threshold of success.
+// Give it practically unlimited time to complete.
+let fullQueryTimeoutMS = runtimeMillis(() => assert.eq("full", runBigBatchQuery(9999999)));
+print("ran in " + fullQueryTimeoutMS + " ms");
+const targetTimeoutMS =
+ 50; // We want the query to run for at least this long, to allow for timeout.
+if (fullQueryTimeoutMS < targetTimeoutMS) {
+ // Assume linear scaling of runtime with the number of docs.
+ nDocs *= Math.ceil(targetTimeoutMS / fullQueryTimeoutMS);
+ // Limit size to prevent long runtime due to bad first sample.
+ nDocs = Math.min(nDocs, 100000);
+ if (nDocs % 2 == 1) { // make sure it's even so the math for half size is easier
+ nDocs += 1;
+ }
+ print("adjusting size to " + nDocs);
+ fullQueryTimeoutMS = 100;
+ initDb(nDocs);
+
+ // Re-time the full query after resizing, with unlimited time allowed.
+ fullQueryTimeoutMS = runtimeMillis(() => assert.eq("full", runBigBatchQuery(9999999)));
+ print("after adjustment, ran in " + fullQueryTimeoutMS + " ms");
+}
+
+/**
+ * @param {int} initialTimeoutMS
+ * @param {function(timeout) --> "error"|"partial"|"full"} queryFunc
+ * @returns timeout that achieved partial results, or fails an assertion if partial results were
+ * never seen.
+ */
+function searchForAndAssertPartialResults(initialTimeoutMS, queryFunc) {
+ // Try this test twice because it's very sensitive to timing and resource contention.
+ for (let i = 1; i <= 2; i++) {
+ let timeoutMS = initialTimeoutMS;
+ const attempts = 20;
+ for (let j = 1; j <= attempts; j++) {
+ print("try query with maxTimeMS: " + timeoutMS);
+ let res = queryFunc(timeoutMS);
+ if (res == "partial") {
+ // Got partial results!
+ return timeoutMS;
+ } else if (res == "full") {
+ // Timeout was so long that we got complete results. Make it shorter and try again
+ if (timeoutMS > 1) { // 1 ms is the min timeout allowed.
+ timeoutMS = Math.floor(0.8 * timeoutMS);
+ }
+ } else {
+ assert.eq("error", res);
+ // Timeout was so short that we go no results. Increase maxTimeMS and try again
+ timeoutMS = Math.ceil(1.1 * timeoutMS);
+ // Don't let the timeout explode upward without bound.
+ if (timeoutMS > 100 * initialTimeoutMS) {
+ break;
+ }
+ }
+ }
+ // Pause for one minute then try once again. We don't expect to ever reach this except
+ // in rare cases when the test infrastructure is behaving inconsistently. We are trying
+ // the test again after a long delay instead of failing the test.
+ sleep(60 * 1000);
+ }
+ // Failed to ever see partial results :-(
+ if (fullQueryTimeoutMS < 10) {
+ lsTest.log("!!!: This error is likely due to the nDocs constant being set too small.");
+ }
+ assert(false, "Did not find partial results after max number of attempts");
+}
+
+// Try to get partial results, while nudging timeout value around the expected time.
+// This first case will try to get all the results in one big batch.
+// Start with half of the full runtime of the query.
+
+// fetch one big batch of results
+searchForAndAssertPartialResults(Math.round(fullQueryTimeoutMS), runBigBatchQuery);
+
+// Try to get partial results in a getMore, while fetching the second half of data.
+searchForAndAssertPartialResults(Math.round(0.5 * fullQueryTimeoutMS), function(timeout) {
+ // Find a small first batch.
+ const smallBatchSize = 1;
+ let findRes = coll.runCommand(
+ {find: collName, allowPartialResults: true, batchSize: smallBatchSize, maxTimeMS: timeout});
+ if (isError(findRes)) {
+ // We don't expect this first small-batch find to timeout, but it can if we're unlucky.
+ assert.eq(ErrorCodes.MaxTimeMSExpired, findRes.code); // timeout
+ return "error";
+ }
+ // Partial results can be either size zero or smallBatchSize.
+ assert.lte(findRes.cursor.firstBatch.length, smallBatchSize);
+ assert.eq(undefined, findRes.cursor.partialResultsReturned);
+
+ // Try to get partial results with a getMore.
+ const secondBatchSize = nDocs - smallBatchSize;
+ return interpretCommandResult(
+ coll.runCommand(
+ {getMore: findRes.cursor.id, collection: collName, batchSize: secondBatchSize}),
+ secondBatchSize);
+});
+
+st.stop();
+}());
diff --git a/jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js b/jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js
new file mode 100644
index 00000000000..b7aeddd17c4
--- /dev/null
+++ b/jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js
@@ -0,0 +1,286 @@
+/**
+ * SERVER-57469: Test that the 'allowPartialResults' option to find is respected when used together
+ * with 'maxTimeMS' and only a subset of the shards provide data before the timeout.
+ * Uses both failpoints and MongoBridge to simulate MaxTimeMSExpired.
+ *
+ * @tags: [
+ * requires_sharding,
+ * requires_replication,
+ * requires_getmore,
+ * requires_fcv_62,
+ * ]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js"); // for 'configureFailPoint()'
+
+Random.setRandomSeed();
+
+function getMillis() {
+ const d = new Date();
+ return d.getTime();
+}
+function runtimeMillis(f) {
+ var start = getMillis();
+ f();
+ return (getMillis() - start);
+}
+function isError(res) {
+ return !res.hasOwnProperty('ok') || !res['ok'];
+}
+
+// Set up a 2-shard single-node replicaset cluster with MongoBridge.
+const st = new ShardingTest({name: jsTestName(), shards: 2, useBridge: true, rs: {nodes: 1}});
+
+const dbName = "test-SERVER-57469";
+const collName = "test-SERVER-57469-coll";
+
+const coll = st.s0.getDB(dbName)[collName];
+
+function initDb(numSamples) {
+ coll.drop();
+
+ // Use ranged sharding with 50% of the data on the second shard.
+ const splitPoint = Math.max(1, numSamples / 2);
+ st.shardColl(
+ coll,
+ {_id: 1}, // shard key
+ {_id: splitPoint}, // split point
+ {_id: splitPoint + 1} // move the chunk to the other shard
+ );
+
+ let bulk = coll.initializeUnorderedBulkOp();
+ for (let i = 0; i < numSamples; i++) {
+ bulk.insert({"_id": i});
+ }
+ assert.commandWorked(bulk.execute());
+}
+
+// Insert some data.
+const size = 1000;
+initDb(size);
+
+function runQueryWithTimeout(doAllowPartialResults, timeout) {
+ return coll.runCommand({
+ find: collName,
+ allowPartialResults: doAllowPartialResults,
+ batchSize: size,
+ maxTimeMS: timeout
+ });
+}
+
+const ampleTimeMS = 10 * runtimeMillis(() => runQueryWithTimeout(true, 999999999));
+print("ampleTimeMS: " + ampleTimeMS);
+
+// Try to fetch all the data in one batch, with ample time allowed.
+function runQuery(doAllowPartialResults) {
+ return runQueryWithTimeout(doAllowPartialResults, ampleTimeMS);
+}
+
+// Simulate mongos timeout during first batch.
+// Shards have no results yet, so we do not return partial results.
+{
+ const fpMongos = configureFailPoint(st.s, "maxTimeAlwaysTimeOut", {}, "alwaysOn");
+ // With 'allowPartialResults: false', if mongos times out then return a timeout error.
+ assert.commandFailedWithCode(runQuery(false), ErrorCodes.MaxTimeMSExpired);
+ // With 'allowPartialResults: true', if mongos times out then return a timeout error.
+ assert.commandFailedWithCode(runQuery(true), ErrorCodes.MaxTimeMSExpired);
+ fpMongos.off();
+}
+
+const batchSizeForGetMore = 10;
+
+// Simulate mongos timeout during getMore.
+function getMoreMongosTimeout(allowPartialResults) {
+ // Get the first batch.
+ const res = assert.commandWorked(coll.runCommand({
+ find: collName,
+ allowPartialResults: allowPartialResults,
+ batchSize: batchSizeForGetMore,
+ maxTimeMS: ampleTimeMS
+ }));
+ assert(!res.cursor.hasOwnProperty("partialResultsReturned"));
+ assert.gt(res.cursor.id, 0);
+ // Stop mongos and run getMore.
+ let fpMongos = configureFailPoint(st.s, "maxTimeAlwaysTimeOut", {}, "alwaysOn");
+
+ // Run getmores repeatedly until we exhaust the cache on mongos.
+ // Eventually we should get either a MaxTimeMS error or partial results because a shard is down.
+ let numReturned = batchSizeForGetMore; // One batch was returned so far.
+ while (true) {
+ const res2 = coll.runCommand(
+ {getMore: res.cursor.id, collection: collName, batchSize: batchSizeForGetMore});
+ if (isError(res2)) {
+ assert.commandFailedWithCode(
+ res2, ErrorCodes.MaxTimeMSExpired, "failure should be due to MaxTimeMSExpired");
+ break;
+ }
+ // Results were cached from the first request. As long as GetMore is not called, these
+ // are returned even if MaxTimeMS expired on mongos.
+ numReturned += res2.cursor.nextBatch.length;
+ print(numReturned + " docs returned so far");
+ assert.neq(numReturned, size, "Got full results even through mongos had MaxTimeMSExpired.");
+ if (res2.cursor.partialResultsReturned) {
+ assert.lt(numReturned, size);
+ break;
+ }
+ }
+ fpMongos.off();
+}
+getMoreMongosTimeout(true);
+getMoreMongosTimeout(false);
+
+// Test shard timeouts. These are the scenario that we expect to be possible in practice.
+// Test using both failpoints and mongo bridge, testing slightly different execution paths.
+
+class MaxTimeMSFailpointFailureController {
+ constructor(mongoInstance) {
+ this.mongoInstance = mongoInstance;
+ this.fp = null;
+ }
+
+ enable() {
+ this.fp = configureFailPoint(this.mongoInstance, "maxTimeAlwaysTimeOut", {}, "alwaysOn");
+ }
+
+ disable() {
+ this.fp.off();
+ }
+}
+
+class NetworkFailureController {
+ constructor(shard) {
+ this.shard = shard;
+ }
+
+ enable() {
+ // Delay messages from mongos to shard so that mongos will see it as having exceeded
+ // MaxTimeMS. The shard process is active and receives the request, but the response is
+ // lost. We delay instead of dropping messages because this lets the shard request proceed
+ // without connection failure and retry (which has its own driver-controlled timeout,
+ // typically 15s).
+ this.shard.getPrimary().delayMessagesFrom(st.s, 2 * ampleTimeMS);
+ }
+
+ disable() {
+ this.shard.getPrimary().delayMessagesFrom(st.s, 0);
+ sleep(2 * ampleTimeMS); // Allow time for delayed messages to be flushed.
+ }
+}
+
+class MultiFailureController {
+ constructor(failureControllerList) {
+ this.controllerList = failureControllerList;
+ }
+
+ enable() {
+ for (const c of this.controllerList) {
+ c.enable();
+ }
+ }
+
+ disable() {
+ for (const c of this.controllerList) {
+ c.disable();
+ }
+ }
+}
+
+const shard0Failpoint = new MaxTimeMSFailpointFailureController(st.shard0);
+const shard1Failpoint = new MaxTimeMSFailpointFailureController(st.shard1);
+const allShardsFailpoint = new MultiFailureController([shard0Failpoint, shard1Failpoint]);
+
+const shard0NetworkFailure = new NetworkFailureController(st.rs0);
+const shard1NetworkFailure = new NetworkFailureController(st.rs1);
+const allshardsNetworkFailure =
+ new MultiFailureController([shard0NetworkFailure, shard1NetworkFailure]);
+
+const allshardsMixedFailures = new MultiFailureController([shard0NetworkFailure, shard1Failpoint]);
+
+// With 'allowPartialResults: true', if a shard times out on getMore then return partial results.
+function partialResultsTrueGetMoreTimeout(failureController) {
+ // Get the first batch.
+ const res = assert.commandWorked(coll.runCommand({
+ find: collName,
+ allowPartialResults: true,
+ batchSize: batchSizeForGetMore,
+ maxTimeMS: ampleTimeMS
+ }));
+ assert.eq(undefined, res.cursor.partialResultsReturned);
+ assert.gt(res.cursor.id, 0);
+ // Stop a shard and run getMore.
+ failureController.enable();
+ let numReturned = batchSizeForGetMore; // One batch was returned so far.
+ print(numReturned + " docs returned in the first batch");
+ while (true) {
+ // Run getmores repeatedly until we exhaust the cache on mongos.
+ // Eventually we should get partial results because a shard is down.
+ const res2 = assert.commandWorked(coll.runCommand(
+ {getMore: res.cursor.id, collection: collName, batchSize: batchSizeForGetMore}));
+ numReturned += res2.cursor.nextBatch.length;
+ print(numReturned + " docs returned so far");
+ assert.neq(numReturned, size, "Entire collection seemed to be cached by the first find!");
+ if (res2.cursor.partialResultsReturned) {
+ assert.lt(numReturned, size);
+ break;
+ }
+ }
+ failureController.disable();
+}
+partialResultsTrueGetMoreTimeout(shard0Failpoint);
+partialResultsTrueGetMoreTimeout(shard1Failpoint);
+partialResultsTrueGetMoreTimeout(shard0NetworkFailure);
+partialResultsTrueGetMoreTimeout(shard1NetworkFailure);
+
+// With 'allowPartialResults: true', if a shard times out on the first batch then return
+// partial results.
+function partialResultsTrueFirstBatch(failureController) {
+ failureController.enable();
+ const res = assert.commandWorked(runQuery(true));
+ assert(res.cursor.partialResultsReturned);
+ assert.eq(res.cursor.firstBatch.length, size / 2);
+ assert.eq(0, res.cursor.id);
+ failureController.disable();
+}
+partialResultsTrueFirstBatch(shard0Failpoint);
+partialResultsTrueFirstBatch(shard1Failpoint);
+partialResultsTrueFirstBatch(shard0NetworkFailure);
+partialResultsTrueFirstBatch(shard1NetworkFailure);
+
+// With 'allowPartialResults: false', if one shard times out then return a timeout error.
+function partialResultsFalseOneFailure(failureController) {
+ failureController.enable();
+ assert.commandFailedWithCode(runQuery(false), ErrorCodes.MaxTimeMSExpired);
+ failureController.disable();
+}
+partialResultsFalseOneFailure(shard0Failpoint);
+partialResultsFalseOneFailure(shard1Failpoint);
+partialResultsFalseOneFailure(shard0NetworkFailure);
+partialResultsFalseOneFailure(shard1NetworkFailure);
+
+// With 'allowPartialResults: false', if both shards time out then return a timeout error.
+function allowPartialResultsFalseAllFailed(failureController) {
+ failureController.enable();
+ assert.commandFailedWithCode(runQuery(false), ErrorCodes.MaxTimeMSExpired);
+ failureController.disable();
+}
+allowPartialResultsFalseAllFailed(allShardsFailpoint);
+allowPartialResultsFalseAllFailed(allshardsNetworkFailure);
+allowPartialResultsFalseAllFailed(allshardsMixedFailures);
+
+// With 'allowPartialResults: true', if both shards time out then return empty "partial" results.
+function allowPartialResultsTrueAllFailed(failureController) {
+ failureController.enable();
+ const res = assert.commandWorked(runQuery(true));
+ assert(res.cursor.partialResultsReturned);
+ assert.eq(0, res.cursor.id);
+ assert.eq(res.cursor.firstBatch.length, 0);
+ failureController.disable();
+}
+allowPartialResultsTrueAllFailed(allShardsFailpoint);
+allowPartialResultsTrueAllFailed(allshardsNetworkFailure);
+allowPartialResultsTrueAllFailed(allshardsMixedFailures);
+
+st.stop();
+}());
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 34c117e6671..30d6b3f16e3 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -500,11 +500,24 @@ Status AsyncResultsMerger::_scheduleGetMores(WithLock lk) {
// Before scheduling more work, check whether the cursor has been invalidated.
_assertNotInvalidated(lk);
+ // Reveal opCtx errors (such as MaxTimeMSExpired) and reflect them in the remote status.
+ invariant(_opCtx, "Cannot schedule a getMore without an OperationContext");
+ const auto interruptStatus = _opCtx->checkForInterruptNoAssert();
+ if (!interruptStatus.isOK()) {
+ for (size_t i = 0; i < _remotes.size(); ++i) {
+ if (!_remotes[i].exhausted()) {
+ _cleanUpFailedBatch(lk, interruptStatus, i);
+ }
+ }
+ return interruptStatus;
+ }
+
// Schedule remote work on hosts for which we need more results.
for (size_t i = 0; i < _remotes.size(); ++i) {
auto& remote = _remotes[i];
if (!remote.status.isOK()) {
+ _cleanUpFailedBatch(lk, remote.status, i);
return remote.status;
}
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 9b5505ddc36..de8bb25dbd6 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -299,8 +299,9 @@ private:
// The identity of the shard which the cursor belongs to.
ShardId shardId;
- // This flag is set if the connection to the remote shard was lost, or never established in
- // the first place. Only applicable if the 'allowPartialResults' option is enabled.
+ // This flag is set if the connection to the remote shard was lost or never established in
+ // the first place or the connection is interrupted due to MaxTimeMSExpired.
+ // Only applicable if the 'allowPartialResults' option is enabled.
bool partialResultsReturned = false;
// The buffer of results that have been retrieved but not yet returned to the caller.
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index fd5be3317f9..f92195f2925 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -978,6 +978,60 @@ TEST_F(AsyncResultsMergerTest, KillCursorCmdHasNoTimeout) {
killFuture.wait();
}
+TEST_F(AsyncResultsMergerTest, KillBeforeNext) {
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
+
+ // Mark the OperationContext as killed from this thread.
+ {
+ stdx::lock_guard<Client> lk(*operationContext()->getClient());
+ operationContext()->markKilled(ErrorCodes::Interrupted);
+ }
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto nextStatus = arm->nextEvent();
+ ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted);
+ });
+
+ // Wait for the merger to be interrupted.
+ future.default_timed_get();
+
+ // Kill should complete.
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
+}
+
+TEST_F(AsyncResultsMergerTest, KillBeforeNextWithTwoRemotes) {
+ std::vector<RemoteCursor> cursors;
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors));
+
+ // Mark the OperationContext as killed from this thread.
+ {
+ stdx::lock_guard<Client> lk(*operationContext()->getClient());
+ operationContext()->markKilled(ErrorCodes::Interrupted);
+ }
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto nextStatus = arm->nextEvent();
+ ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted);
+ });
+
+ // Wait for the merger to be interrupted.
+ future.default_timed_get();
+
+ // Kill should complete.
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
+}
+
TEST_F(AsyncResultsMergerTest, TailableBasic) {
BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}");
std::vector<RemoteCursor> cursors;
@@ -1256,6 +1310,110 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) {
ASSERT_TRUE(arm->ready());
}
+TEST_F(AsyncResultsMergerTest, MaxTimeMSExpiredAllowPartialResultsTrue) {
+ BSONObj findCmd = BSON("find"
+ << "testcoll"
+ << "allowPartialResults" << true);
+ std::vector<RemoteCursor> cursors;
+ // Two shards.
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // First host returns single result.
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
+ responses.emplace_back(kTestNss, CursorId(0), batch);
+ scheduleNetworkResponses(std::move(responses));
+
+ // From the second host we get a MaxTimeMSExpired error.
+ scheduleErrorResponse({ErrorCodes::MaxTimeMSExpired, "MaxTimeMSExpired"});
+
+ executor()->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(arm->partialResultsReturned());
+ ASSERT_TRUE(arm->remotesExhausted());
+ ASSERT_TRUE(arm->ready());
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
+ ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF());
+}
+
+TEST_F(AsyncResultsMergerTest, MaxTimeMSExpiredAllowPartialResultsFalse) {
+ BSONObj findCmd = BSON("find"
+ << "testcoll"
+ << "allowPartialResults" << false);
+ std::vector<RemoteCursor> cursors;
+ // two shards
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // First host returns single result
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
+ responses.emplace_back(kTestNss, CursorId(0), batch);
+ scheduleNetworkResponses(std::move(responses));
+
+ // From the second host we get a MaxTimeMSExpired error.
+ scheduleErrorResponse({ErrorCodes::MaxTimeMSExpired, "MaxTimeMSExpired"});
+
+ executor()->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ auto statusWithNext = arm->nextReady();
+ ASSERT(!statusWithNext.isOK());
+ ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::MaxTimeMSExpired);
+ // Required to kill the 'arm' on error before destruction.
+ auto killFuture = arm->kill(operationContext());
+ killFuture.wait();
+}
+
+TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnMaxTimeMSExpiredThenLateData) {
+ BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}");
+ std::vector<RemoteCursor> cursors;
+ // two shards
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ cursors.push_back(
+ makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {})));
+ auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd);
+
+ ASSERT_FALSE(arm->ready());
+ auto readyEvent = unittest::assertGet(arm->nextEvent());
+ ASSERT_FALSE(arm->ready());
+
+ // From the first host we get a MaxTimeMSExpired error.
+ scheduleErrorResponse({ErrorCodes::MaxTimeMSExpired, "MaxTimeMSExpired"});
+
+ // Second host returns single result *after* first host times out.
+ std::vector<CursorResponse> responses;
+ std::vector<BSONObj> batch = {fromjson("{_id: 1}")};
+ responses.emplace_back(kTestNss, CursorId(0), batch);
+ scheduleNetworkResponses(std::move(responses));
+
+ executor()->waitForEvent(readyEvent);
+
+ ASSERT_TRUE(arm->ready());
+ ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult());
+ ASSERT_TRUE(arm->partialResultsReturned());
+ ASSERT_TRUE(arm->remotesExhausted());
+ ASSERT_TRUE(arm->ready());
+}
+
TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) {
BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}");
std::vector<RemoteCursor> cursors;
diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp
index 15e37b0460d..5871d49d4ac 100644
--- a/src/mongo/s/query/blocking_results_merger_test.cpp
+++ b/src/mongo/s/query/blocking_results_merger_test.cpp
@@ -196,6 +196,10 @@ TEST_F(ResultsMergerTestFixture, ShouldBeInterruptibleDuringBlockingNext) {
ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted);
});
+ // Sleep to allow the blockingMerger.next() thread to started. If the thread is not started
+ // even after the sleep, then it's the same test as ShouldBeInterruptibleBeforeBlockingNext.
+ sleepmillis(10);
+
// Now mark the OperationContext as killed from this thread.
{
stdx::lock_guard<Client> lk(*operationContext()->getClient());
@@ -220,6 +224,33 @@ TEST_F(ResultsMergerTestFixture, ShouldBeInterruptibleDuringBlockingNext) {
future.default_timed_get();
}
+TEST_F(ResultsMergerTestFixture, ShouldBeInterruptibleBeforeBlockingNext) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto params = makeARMParamsFromExistingCursors(std::move(cursors));
+ BlockingResultsMerger blockingMerger(
+ operationContext(), std::move(params), executor(), nullptr);
+
+ // Mark the OperationContext as killed from this thread.
+ {
+ stdx::lock_guard<Client> lk(*operationContext()->getClient());
+ operationContext()->markKilled(ErrorCodes::Interrupted);
+ }
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto nextStatus = blockingMerger.next(operationContext());
+ ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted);
+ });
+
+ // Wait for the merger to be interrupted.
+ future.default_timed_get();
+
+ // Kill should complete.
+ blockingMerger.kill(operationContext());
+}
+
TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenYielding) {
class ThrowyResourceYielder : public ResourceYielder {
public:
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index b08a807788c..444b0615d99 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -103,6 +103,7 @@ StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() {
invariant(_opCtx);
const auto interruptStatus = _opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
+ _maxTimeMSExpired |= (interruptStatus.code() == ErrorCodes::MaxTimeMSExpired);
return interruptStatus;
}
@@ -156,7 +157,8 @@ const PrivilegeVector& ClusterClientCursorImpl::getOriginatingPrivileges() const
}
bool ClusterClientCursorImpl::partialResultsReturned() const {
- return _root->partialResultsReturned();
+ // We may have timed out in this layer, or within the plan tree waiting for results from shards.
+ return (_maxTimeMSExpired && _params.isAllowPartialResults) || _root->partialResultsReturned();
}
std::size_t ClusterClientCursorImpl::getNumRemotes() const {
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index 2529254cfce..4020918761c 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -177,6 +177,9 @@ private:
// The number of batches returned by this cursor.
std::uint64_t _nBatchesReturned = 0;
+
+ // Whether ClusterClientCursor::next() was interrupted due to MaxTimeMSExpired.
+ bool _maxTimeMSExpired = false;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 8c3ba12e097..77d9a818f7b 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -314,13 +314,10 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
establishCursorsOnShards({cm.dbPrimary()});
MONGO_UNREACHABLE;
}
-
throw;
}
-
// Determine whether the cursor we may eventually register will be single- or multi-target.
-
const auto cursorType = params.remotes.size() > 1
? ClusterCursorManager::CursorType::MultiTarget
: ClusterCursorManager::CursorType::SingleTarget;
@@ -335,7 +332,6 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
}
// Transfer the established cursors to a ClusterClientCursor.
-
auto ccc = ClusterClientCursorImpl::make(
opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
@@ -343,16 +339,66 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
FindCommon::waitInFindBeforeMakingBatch(opCtx, query);
+ // If we're allowing partial results and we got MaxTimeMSExpired, then temporarily disable
+ // interrupts in the opCtx so that we can pull already-fetched data from ClusterClientCursor.
+ bool ignoringInterrupts = false;
+ if (findCommand.getAllowPartialResults() &&
+ opCtx->checkForInterruptNoAssert().code() == ErrorCodes::MaxTimeMSExpired) {
+ // MaxTimeMS is expired, but perhaps remotes not have expired their requests yet.
+ // Wait for all remote cursors to be exhausted so that we can safely disable interrupts
+ // in the opCtx. We want to be sure that later calls to ccc->next() do not block on
+ // more data.
+
+ // Maximum number of 1ms sleeps to wait for remote cursors to be exhausted.
+ constexpr int kMaxAttempts = 10;
+ for (int remainingAttempts = kMaxAttempts; !ccc->remotesExhausted(); remainingAttempts--) {
+ if (!remainingAttempts) {
+ LOGV2_DEBUG(
+ 5746900,
+ 0,
+ "MaxTimeMSExpired error was seen on the router, but partial results cannot be "
+ "returned because the remotes did not give the expected MaxTimeMS error within "
+ "kMaxAttempts.");
+ // Reveal the MaxTimeMSExpired error.
+ opCtx->checkForInterrupt();
+ }
+ stdx::this_thread::sleep_for(stdx::chrono::milliseconds(1));
+ }
+
+ // The first MaxTimeMSExpired will have called opCtx->markKilled() so any later
+ // call to opCtx->checkForInterruptNoAssert() will return an error. We need to
+ // temporarily ignore this while we pull data from the ClusterClientCursor.
+ LOGV2_DEBUG(
+ 5746901,
+ 0,
+ "Attempting to return partial results because MaxTimeMS expired and the query set "
+ "AllowPartialResults. Temporarily disabling interrupts on the OperationContext "
+ "while partial results are pulled from the ClusterClientCursor.");
+ opCtx->setIgnoreInterruptsExceptForReplStateChange(true);
+ ignoringInterrupts = true;
+ }
+
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
size_t bytesBuffered = 0;
- // This loop will not result in actually calling getMore against shards, but just loading
- // results from the initial batches (that were obtained while establishing cursors) into
- // 'results'.
+ // This loop will load enough results from the shards for a full first batch. At first, these
+ // results come from the initial batches that were obtained when establishing cursors, but
+ // ClusterClientCursor::next will fetch further results if necessary.
while (!FindCommon::enoughForFirstBatch(findCommand, results->size())) {
- auto next = uassertStatusOK(ccc->next());
-
- if (next.isEOF()) {
+ auto nextWithStatus = ccc->next();
+ if (findCommand.getAllowPartialResults() &&
+ (nextWithStatus.getStatus() == ErrorCodes::MaxTimeMSExpired)) {
+ if (ccc->remotesExhausted()) {
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ break;
+ }
+ // Continue because there may be results waiting from other remotes.
+ continue;
+ } else {
+ // all error statuses besides permissible remote timeouts should be returned to the user
+ uassertStatusOK(nextWithStatus);
+ }
+ if (nextWithStatus.getValue().isEOF()) {
// We reached end-of-stream. If the cursor is not tailable, then we mark it as
// exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even
// when we reach end-of-stream. However, if all the remote cursors are exhausted, there
@@ -363,7 +409,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
break;
}
- auto nextObj = *next.getResult();
+ auto nextObj = *(nextWithStatus.getValue().getResult());
// If adding this object will cause us to exceed the message size limit, then we stash it
// for later.
@@ -378,6 +424,19 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
results->push_back(std::move(nextObj));
}
+ if (ignoringInterrupts) {
+ opCtx->setIgnoreInterruptsExceptForReplStateChange(false);
+ ignoringInterrupts = false;
+ LOGV2_DEBUG(5746902, 0, "Re-enabled interrupts on the OperationContext.");
+ }
+
+ // Surface any opCtx interrupts, except ignore MaxTimeMSExpired with allowPartialResults.
+ auto interruptStatus = opCtx->checkForInterruptNoAssert();
+ if (!(interruptStatus.code() == ErrorCodes::MaxTimeMSExpired &&
+ findCommand.getAllowPartialResults())) {
+ uassertStatusOK(interruptStatus);
+ }
+
ccc->detachFromOperationContext();
if (findCommand.getSingleBatch() && !ccc->isTailable()) {
@@ -390,6 +449,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
// If the caller wants to know whether the cursor returned partial results, set it here.
if (partialResultsReturned) {
+ // Missing results can come either from the first batches or from the ccc's later batches.
*partialResultsReturned = ccc->partialResultsReturned();
}
@@ -811,6 +871,16 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,
}
if (!next.isOK()) {
+ if (next.getStatus() == ErrorCodes::MaxTimeMSExpired &&
+ pinnedCursor.getValue()->partialResultsReturned()) {
+ // Break to return partial results rather than return a MaxTimeMSExpired error
+ cursorState = ClusterCursorManager::CursorState::Exhausted;
+ LOGV2_DEBUG(5746903,
+ 0,
+ "Attempting to return partial results because MaxTimeMS expired and "
+ "the query set AllowPartialResults.");
+ break;
+ }
return next.getStatus();
}
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index e3de9821128..21219f3db84 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -270,10 +270,11 @@ void CursorEstablisher::_handleFailure(const AsyncRequestsSender::Response& resp
return;
}
- // Retriable errors are swallowed if '_allowPartialResults' is true. Targeting shard replica
- // sets can also throw FailedToSatisfyReadPreference, so we swallow it too.
+ // If '_allowPartialResults' is true then swallow retriable errors, maxTimeMSExpired, and
+ // FailedToSatisfyReadPreference errors we might get when targeting shard replica sets.
bool isEligibleException = (isMongosRetriableError(status.code()) ||
- status.code() == ErrorCodes::FailedToSatisfyReadPreference);
+ status.code() == ErrorCodes::FailedToSatisfyReadPreference ||
+ status.code() == ErrorCodes::MaxTimeMSExpired);
if (_allowPartialResults && isEligibleException) {
// This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an
// empty HostAndPort, and which has the 'partialResultsReturned' flag set to true.
diff --git a/src/mongo/s/query/establish_cursors_test.cpp b/src/mongo/s/query/establish_cursors_test.cpp
index 1c73abf55a2..9e18a623034 100644
--- a/src/mongo/s/query/establish_cursors_test.cpp
+++ b/src/mongo/s/query/establish_cursors_test.cpp
@@ -692,6 +692,44 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteMaxesOutRetriableErrorsAllo
future.default_timed_get();
}
+TEST_F(EstablishCursorsTest, MultipleRemotesAllMaxOutRetriableErrorsAllowPartialResults) {
+ BSONObj cmdObj = fromjson("{find: 'testcoll'}");
+ std::vector<std::pair<ShardId, BSONObj>> remotes{
+ {kTestShardIds[0], cmdObj}, {kTestShardIds[1], cmdObj}, {kTestShardIds[2], cmdObj}};
+
+ // Failure to establish a cursor due to maxing out retriable errors on all three remotes
+ // returns an error, despite allowPartialResults being true.
+ auto future = launchAsync([&] {
+ auto cursors = establishCursors(operationContext(),
+ executor(),
+ _nss,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ remotes,
+ true); // allowPartialResults
+ // allowPartialResults is true so ignore the fact that all remotes will haved failed
+ // to establish a cursor due to maxing out retriable errors. The cursor entry
+ // is marked as 'partialResultReturned:true', with a CursorId of 0 and no HostAndPort.
+ ASSERT_EQ(remotes.size(), cursors.size());
+ for (auto&& cursor : cursors) {
+ ASSERT(cursor.getHostAndPort().empty());
+ ASSERT(cursor.getCursorResponse().getPartialResultsReturned());
+ ASSERT_EQ(cursor.getCursorResponse().getCursorId(), CursorId{0});
+ }
+ });
+
+ // All remotes always respond with retriable errors.
+ for (auto it = remotes.begin(); it != remotes.end(); ++it) {
+ for (int i = 0; i < kMaxRetries + 1; ++i) {
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData());
+ return Status(ErrorCodes::HostUnreachable, "host unreachable");
+ });
+ }
+ }
+
+ future.default_timed_get();
+}
+
TEST_F(EstablishCursorsTest, InterruptedWithDanglingRemoteRequest) {
BSONObj cmdObj = fromjson("{find: 'testcoll'}");
std::vector<std::pair<ShardId, BSONObj>> remotes{