diff options
author | Steve Tarzia <steve.tarzia@mongodb.com> | 2022-10-26 23:05:13 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-27 00:12:58 +0000 |
commit | e3794c64dfa909a99655c34f5ca591ae56061377 (patch) | |
tree | 9c0b11030ef5b2512950910e3aa1cc1fb5b2d491 | |
parent | 108528c1fdc795fa1f9571f2073668d4fc7f05c3 (diff) | |
download | mongo-e3794c64dfa909a99655c34f5ca591ae56061377.tar.gz |
SERVER-57469 return partial results when a shard times out
-rw-r--r-- | jstests/serial_run/allow_partial_results_with_maxTimeMS.js | 197 | ||||
-rw-r--r-- | jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js | 286 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 13 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 5 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 158 | ||||
-rw-r--r-- | src/mongo/s/query/blocking_results_merger_test.cpp | 31 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.h | 3 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 92 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors_test.cpp | 38 |
11 files changed, 817 insertions, 17 deletions
diff --git a/jstests/serial_run/allow_partial_results_with_maxTimeMS.js b/jstests/serial_run/allow_partial_results_with_maxTimeMS.js new file mode 100644 index 00000000000..65a10a82f0e --- /dev/null +++ b/jstests/serial_run/allow_partial_results_with_maxTimeMS.js @@ -0,0 +1,197 @@ +/** + * SERVER-57469: Test that the 'allowPartialResults' option to find is respected when used together + * with 'maxTimeMS' and only a subset of the shards provide data before the timeout. + * Does not rely on any failpoints, but tries to create a MaxTimeMSExpired scenario on an unaltered + * system. These tests are sensitive to timing; the execution resources (system performance) must + * be relativlitey consistent throughout the test. + * + * @tags: [ + * requires_sharding, + * requires_replication, + * requires_getmore, + * requires_fcv_62, + * ] + */ +(function() { +"use strict"; + +function getMillis() { + const d = new Date(); + return d.getTime(); +} +function runtimeMillis(f) { + var start = getMillis(); + f(); + return (getMillis() - start); +} +function isError(res) { + return !res.hasOwnProperty('ok') || !res['ok']; +} + +Random.setRandomSeed(); + +const dbName = "test-SERVER-57469"; +const collName = "test-SERVER-57469-coll"; + +// Set up a 2-shard single-node replicaset cluster. +const st = new ShardingTest({name: jsTestName(), shards: 2, rs: {nodes: 1}}); + +const coll = st.s0.getDB(dbName)[collName]; +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +st.ensurePrimaryShard(dbName, st.shard0.name); + +// Insert some data. +function initDb(numSamples) { + coll.drop(); + + // Use ranged sharding with 90% of the value range on the second shard. + const splitPoint = Math.max(1, numSamples / 10); + st.shardColl( + coll, + {_id: 1}, // shard key + {_id: splitPoint}, // split point + {_id: splitPoint + 1} // move the chunk to the other shard + ); + + let bulk = coll.initializeUnorderedBulkOp(); + for (let i = 0; i < numSamples; i++) { + bulk.insert({"_id": i}); + } + assert.commandWorked(bulk.execute()); +} + +let nDocs = 1000; +initDb(nDocs); + +/** + * @param {Object} cmdRes coll.runCommand() result + * @param {int} expectedFullSize of results + * @returns {String} "error"|"partial"|"full" + */ +function interpretCommandResult(cmdRes, expectedFullSize) { + if (isError(cmdRes)) { + print(JSON.stringify(cmdRes)); + assert.eq(ErrorCodes.MaxTimeMSExpired, cmdRes.code); // timeout + return "error"; + } + let fetchedSize = (cmdRes.cursor.firstBatch !== undefined) ? cmdRes.cursor.firstBatch.length + : cmdRes.cursor.nextBatch.length; + if (cmdRes.cursor.partialResultsReturned) { + assert.lt(fetchedSize, expectedFullSize); + assert.eq(0, cmdRes.cursor.id); // Note: we always see cursor id == 0 with partial results. + return "partial"; + } + assert.eq(fetchedSize, expectedFullSize); + assert.eq(undefined, cmdRes.cursor.partialResultsReturned); + return "full"; +} + +function runBigBatchQuery(timeoutMs) { + // The batchSize is equal to the full collection size. + return interpretCommandResult( + coll.runCommand( + {find: collName, maxTimeMS: timeoutMs, allowPartialResults: true, batchSize: nDocs}), + nDocs); +} + +// Time the full query. + +// First, experimentally find an initial timeout value that is just on the threshold of success. +// Give it practically unlimited time to complete. +let fullQueryTimeoutMS = runtimeMillis(() => assert.eq("full", runBigBatchQuery(9999999))); +print("ran in " + fullQueryTimeoutMS + " ms"); +const targetTimeoutMS = + 50; // We want the query to run for at least this long, to allow for timeout. +if (fullQueryTimeoutMS < targetTimeoutMS) { + // Assume linear scaling of runtime with the number of docs. + nDocs *= Math.ceil(targetTimeoutMS / fullQueryTimeoutMS); + // Limit size to prevent long runtime due to bad first sample. + nDocs = Math.min(nDocs, 100000); + if (nDocs % 2 == 1) { // make sure it's even so the math for half size is easier + nDocs += 1; + } + print("adjusting size to " + nDocs); + fullQueryTimeoutMS = 100; + initDb(nDocs); + + // Re-time the full query after resizing, with unlimited time allowed. + fullQueryTimeoutMS = runtimeMillis(() => assert.eq("full", runBigBatchQuery(9999999))); + print("after adjustment, ran in " + fullQueryTimeoutMS + " ms"); +} + +/** + * @param {int} initialTimeoutMS + * @param {function(timeout) --> "error"|"partial"|"full"} queryFunc + * @returns timeout that achieved partial results, or fails an assertion if partial results were + * never seen. + */ +function searchForAndAssertPartialResults(initialTimeoutMS, queryFunc) { + // Try this test twice because it's very sensitive to timing and resource contention. + for (let i = 1; i <= 2; i++) { + let timeoutMS = initialTimeoutMS; + const attempts = 20; + for (let j = 1; j <= attempts; j++) { + print("try query with maxTimeMS: " + timeoutMS); + let res = queryFunc(timeoutMS); + if (res == "partial") { + // Got partial results! + return timeoutMS; + } else if (res == "full") { + // Timeout was so long that we got complete results. Make it shorter and try again + if (timeoutMS > 1) { // 1 ms is the min timeout allowed. + timeoutMS = Math.floor(0.8 * timeoutMS); + } + } else { + assert.eq("error", res); + // Timeout was so short that we go no results. Increase maxTimeMS and try again + timeoutMS = Math.ceil(1.1 * timeoutMS); + // Don't let the timeout explode upward without bound. + if (timeoutMS > 100 * initialTimeoutMS) { + break; + } + } + } + // Pause for one minute then try once again. We don't expect to ever reach this except + // in rare cases when the test infrastructure is behaving inconsistently. We are trying + // the test again after a long delay instead of failing the test. + sleep(60 * 1000); + } + // Failed to ever see partial results :-( + if (fullQueryTimeoutMS < 10) { + lsTest.log("!!!: This error is likely due to the nDocs constant being set too small."); + } + assert(false, "Did not find partial results after max number of attempts"); +} + +// Try to get partial results, while nudging timeout value around the expected time. +// This first case will try to get all the results in one big batch. +// Start with half of the full runtime of the query. + +// fetch one big batch of results +searchForAndAssertPartialResults(Math.round(fullQueryTimeoutMS), runBigBatchQuery); + +// Try to get partial results in a getMore, while fetching the second half of data. +searchForAndAssertPartialResults(Math.round(0.5 * fullQueryTimeoutMS), function(timeout) { + // Find a small first batch. + const smallBatchSize = 1; + let findRes = coll.runCommand( + {find: collName, allowPartialResults: true, batchSize: smallBatchSize, maxTimeMS: timeout}); + if (isError(findRes)) { + // We don't expect this first small-batch find to timeout, but it can if we're unlucky. + assert.eq(ErrorCodes.MaxTimeMSExpired, findRes.code); // timeout + return "error"; + } + // Partial results can be either size zero or smallBatchSize. + assert.lte(findRes.cursor.firstBatch.length, smallBatchSize); + assert.eq(undefined, findRes.cursor.partialResultsReturned); + + // Try to get partial results with a getMore. + const secondBatchSize = nDocs - smallBatchSize; + return interpretCommandResult( + coll.runCommand( + {getMore: findRes.cursor.id, collection: collName, batchSize: secondBatchSize}), + secondBatchSize); +}); + +st.stop(); +}()); diff --git a/jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js b/jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js new file mode 100644 index 00000000000..b7aeddd17c4 --- /dev/null +++ b/jstests/sharding/allow_partial_results_with_maxTimeMS_failpoints.js @@ -0,0 +1,286 @@ +/** + * SERVER-57469: Test that the 'allowPartialResults' option to find is respected when used together + * with 'maxTimeMS' and only a subset of the shards provide data before the timeout. + * Uses both failpoints and MongoBridge to simulate MaxTimeMSExpired. + * + * @tags: [ + * requires_sharding, + * requires_replication, + * requires_getmore, + * requires_fcv_62, + * ] + */ +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); // for 'configureFailPoint()' + +Random.setRandomSeed(); + +function getMillis() { + const d = new Date(); + return d.getTime(); +} +function runtimeMillis(f) { + var start = getMillis(); + f(); + return (getMillis() - start); +} +function isError(res) { + return !res.hasOwnProperty('ok') || !res['ok']; +} + +// Set up a 2-shard single-node replicaset cluster with MongoBridge. +const st = new ShardingTest({name: jsTestName(), shards: 2, useBridge: true, rs: {nodes: 1}}); + +const dbName = "test-SERVER-57469"; +const collName = "test-SERVER-57469-coll"; + +const coll = st.s0.getDB(dbName)[collName]; + +function initDb(numSamples) { + coll.drop(); + + // Use ranged sharding with 50% of the data on the second shard. + const splitPoint = Math.max(1, numSamples / 2); + st.shardColl( + coll, + {_id: 1}, // shard key + {_id: splitPoint}, // split point + {_id: splitPoint + 1} // move the chunk to the other shard + ); + + let bulk = coll.initializeUnorderedBulkOp(); + for (let i = 0; i < numSamples; i++) { + bulk.insert({"_id": i}); + } + assert.commandWorked(bulk.execute()); +} + +// Insert some data. +const size = 1000; +initDb(size); + +function runQueryWithTimeout(doAllowPartialResults, timeout) { + return coll.runCommand({ + find: collName, + allowPartialResults: doAllowPartialResults, + batchSize: size, + maxTimeMS: timeout + }); +} + +const ampleTimeMS = 10 * runtimeMillis(() => runQueryWithTimeout(true, 999999999)); +print("ampleTimeMS: " + ampleTimeMS); + +// Try to fetch all the data in one batch, with ample time allowed. +function runQuery(doAllowPartialResults) { + return runQueryWithTimeout(doAllowPartialResults, ampleTimeMS); +} + +// Simulate mongos timeout during first batch. +// Shards have no results yet, so we do not return partial results. +{ + const fpMongos = configureFailPoint(st.s, "maxTimeAlwaysTimeOut", {}, "alwaysOn"); + // With 'allowPartialResults: false', if mongos times out then return a timeout error. + assert.commandFailedWithCode(runQuery(false), ErrorCodes.MaxTimeMSExpired); + // With 'allowPartialResults: true', if mongos times out then return a timeout error. + assert.commandFailedWithCode(runQuery(true), ErrorCodes.MaxTimeMSExpired); + fpMongos.off(); +} + +const batchSizeForGetMore = 10; + +// Simulate mongos timeout during getMore. +function getMoreMongosTimeout(allowPartialResults) { + // Get the first batch. + const res = assert.commandWorked(coll.runCommand({ + find: collName, + allowPartialResults: allowPartialResults, + batchSize: batchSizeForGetMore, + maxTimeMS: ampleTimeMS + })); + assert(!res.cursor.hasOwnProperty("partialResultsReturned")); + assert.gt(res.cursor.id, 0); + // Stop mongos and run getMore. + let fpMongos = configureFailPoint(st.s, "maxTimeAlwaysTimeOut", {}, "alwaysOn"); + + // Run getmores repeatedly until we exhaust the cache on mongos. + // Eventually we should get either a MaxTimeMS error or partial results because a shard is down. + let numReturned = batchSizeForGetMore; // One batch was returned so far. + while (true) { + const res2 = coll.runCommand( + {getMore: res.cursor.id, collection: collName, batchSize: batchSizeForGetMore}); + if (isError(res2)) { + assert.commandFailedWithCode( + res2, ErrorCodes.MaxTimeMSExpired, "failure should be due to MaxTimeMSExpired"); + break; + } + // Results were cached from the first request. As long as GetMore is not called, these + // are returned even if MaxTimeMS expired on mongos. + numReturned += res2.cursor.nextBatch.length; + print(numReturned + " docs returned so far"); + assert.neq(numReturned, size, "Got full results even through mongos had MaxTimeMSExpired."); + if (res2.cursor.partialResultsReturned) { + assert.lt(numReturned, size); + break; + } + } + fpMongos.off(); +} +getMoreMongosTimeout(true); +getMoreMongosTimeout(false); + +// Test shard timeouts. These are the scenario that we expect to be possible in practice. +// Test using both failpoints and mongo bridge, testing slightly different execution paths. + +class MaxTimeMSFailpointFailureController { + constructor(mongoInstance) { + this.mongoInstance = mongoInstance; + this.fp = null; + } + + enable() { + this.fp = configureFailPoint(this.mongoInstance, "maxTimeAlwaysTimeOut", {}, "alwaysOn"); + } + + disable() { + this.fp.off(); + } +} + +class NetworkFailureController { + constructor(shard) { + this.shard = shard; + } + + enable() { + // Delay messages from mongos to shard so that mongos will see it as having exceeded + // MaxTimeMS. The shard process is active and receives the request, but the response is + // lost. We delay instead of dropping messages because this lets the shard request proceed + // without connection failure and retry (which has its own driver-controlled timeout, + // typically 15s). + this.shard.getPrimary().delayMessagesFrom(st.s, 2 * ampleTimeMS); + } + + disable() { + this.shard.getPrimary().delayMessagesFrom(st.s, 0); + sleep(2 * ampleTimeMS); // Allow time for delayed messages to be flushed. + } +} + +class MultiFailureController { + constructor(failureControllerList) { + this.controllerList = failureControllerList; + } + + enable() { + for (const c of this.controllerList) { + c.enable(); + } + } + + disable() { + for (const c of this.controllerList) { + c.disable(); + } + } +} + +const shard0Failpoint = new MaxTimeMSFailpointFailureController(st.shard0); +const shard1Failpoint = new MaxTimeMSFailpointFailureController(st.shard1); +const allShardsFailpoint = new MultiFailureController([shard0Failpoint, shard1Failpoint]); + +const shard0NetworkFailure = new NetworkFailureController(st.rs0); +const shard1NetworkFailure = new NetworkFailureController(st.rs1); +const allshardsNetworkFailure = + new MultiFailureController([shard0NetworkFailure, shard1NetworkFailure]); + +const allshardsMixedFailures = new MultiFailureController([shard0NetworkFailure, shard1Failpoint]); + +// With 'allowPartialResults: true', if a shard times out on getMore then return partial results. +function partialResultsTrueGetMoreTimeout(failureController) { + // Get the first batch. + const res = assert.commandWorked(coll.runCommand({ + find: collName, + allowPartialResults: true, + batchSize: batchSizeForGetMore, + maxTimeMS: ampleTimeMS + })); + assert.eq(undefined, res.cursor.partialResultsReturned); + assert.gt(res.cursor.id, 0); + // Stop a shard and run getMore. + failureController.enable(); + let numReturned = batchSizeForGetMore; // One batch was returned so far. + print(numReturned + " docs returned in the first batch"); + while (true) { + // Run getmores repeatedly until we exhaust the cache on mongos. + // Eventually we should get partial results because a shard is down. + const res2 = assert.commandWorked(coll.runCommand( + {getMore: res.cursor.id, collection: collName, batchSize: batchSizeForGetMore})); + numReturned += res2.cursor.nextBatch.length; + print(numReturned + " docs returned so far"); + assert.neq(numReturned, size, "Entire collection seemed to be cached by the first find!"); + if (res2.cursor.partialResultsReturned) { + assert.lt(numReturned, size); + break; + } + } + failureController.disable(); +} +partialResultsTrueGetMoreTimeout(shard0Failpoint); +partialResultsTrueGetMoreTimeout(shard1Failpoint); +partialResultsTrueGetMoreTimeout(shard0NetworkFailure); +partialResultsTrueGetMoreTimeout(shard1NetworkFailure); + +// With 'allowPartialResults: true', if a shard times out on the first batch then return +// partial results. +function partialResultsTrueFirstBatch(failureController) { + failureController.enable(); + const res = assert.commandWorked(runQuery(true)); + assert(res.cursor.partialResultsReturned); + assert.eq(res.cursor.firstBatch.length, size / 2); + assert.eq(0, res.cursor.id); + failureController.disable(); +} +partialResultsTrueFirstBatch(shard0Failpoint); +partialResultsTrueFirstBatch(shard1Failpoint); +partialResultsTrueFirstBatch(shard0NetworkFailure); +partialResultsTrueFirstBatch(shard1NetworkFailure); + +// With 'allowPartialResults: false', if one shard times out then return a timeout error. +function partialResultsFalseOneFailure(failureController) { + failureController.enable(); + assert.commandFailedWithCode(runQuery(false), ErrorCodes.MaxTimeMSExpired); + failureController.disable(); +} +partialResultsFalseOneFailure(shard0Failpoint); +partialResultsFalseOneFailure(shard1Failpoint); +partialResultsFalseOneFailure(shard0NetworkFailure); +partialResultsFalseOneFailure(shard1NetworkFailure); + +// With 'allowPartialResults: false', if both shards time out then return a timeout error. +function allowPartialResultsFalseAllFailed(failureController) { + failureController.enable(); + assert.commandFailedWithCode(runQuery(false), ErrorCodes.MaxTimeMSExpired); + failureController.disable(); +} +allowPartialResultsFalseAllFailed(allShardsFailpoint); +allowPartialResultsFalseAllFailed(allshardsNetworkFailure); +allowPartialResultsFalseAllFailed(allshardsMixedFailures); + +// With 'allowPartialResults: true', if both shards time out then return empty "partial" results. +function allowPartialResultsTrueAllFailed(failureController) { + failureController.enable(); + const res = assert.commandWorked(runQuery(true)); + assert(res.cursor.partialResultsReturned); + assert.eq(0, res.cursor.id); + assert.eq(res.cursor.firstBatch.length, 0); + failureController.disable(); +} +allowPartialResultsTrueAllFailed(allShardsFailpoint); +allowPartialResultsTrueAllFailed(allshardsNetworkFailure); +allowPartialResultsTrueAllFailed(allshardsMixedFailures); + +st.stop(); +}()); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 34c117e6671..30d6b3f16e3 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -500,11 +500,24 @@ Status AsyncResultsMerger::_scheduleGetMores(WithLock lk) { // Before scheduling more work, check whether the cursor has been invalidated. _assertNotInvalidated(lk); + // Reveal opCtx errors (such as MaxTimeMSExpired) and reflect them in the remote status. + invariant(_opCtx, "Cannot schedule a getMore without an OperationContext"); + const auto interruptStatus = _opCtx->checkForInterruptNoAssert(); + if (!interruptStatus.isOK()) { + for (size_t i = 0; i < _remotes.size(); ++i) { + if (!_remotes[i].exhausted()) { + _cleanUpFailedBatch(lk, interruptStatus, i); + } + } + return interruptStatus; + } + // Schedule remote work on hosts for which we need more results. for (size_t i = 0; i < _remotes.size(); ++i) { auto& remote = _remotes[i]; if (!remote.status.isOK()) { + _cleanUpFailedBatch(lk, remote.status, i); return remote.status; } diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 9b5505ddc36..de8bb25dbd6 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -299,8 +299,9 @@ private: // The identity of the shard which the cursor belongs to. ShardId shardId; - // This flag is set if the connection to the remote shard was lost, or never established in - // the first place. Only applicable if the 'allowPartialResults' option is enabled. + // This flag is set if the connection to the remote shard was lost or never established in + // the first place or the connection is interrupted due to MaxTimeMSExpired. + // Only applicable if the 'allowPartialResults' option is enabled. bool partialResultsReturned = false; // The buffer of results that have been retrieved but not yet returned to the caller. diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index fd5be3317f9..f92195f2925 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -978,6 +978,60 @@ TEST_F(AsyncResultsMergerTest, KillCursorCmdHasNoTimeout) { killFuture.wait(); } +TEST_F(AsyncResultsMergerTest, KillBeforeNext) { + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); + + // Mark the OperationContext as killed from this thread. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->markKilled(ErrorCodes::Interrupted); + } + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto nextStatus = arm->nextEvent(); + ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted); + }); + + // Wait for the merger to be interrupted. + future.default_timed_get(); + + // Kill should complete. + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); +} + +TEST_F(AsyncResultsMergerTest, KillBeforeNextWithTwoRemotes) { + std::vector<RemoteCursor> cursors; + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors)); + + // Mark the OperationContext as killed from this thread. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->markKilled(ErrorCodes::Interrupted); + } + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto nextStatus = arm->nextEvent(); + ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted); + }); + + // Wait for the merger to be interrupted. + future.default_timed_get(); + + // Kill should complete. + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); +} + TEST_F(AsyncResultsMergerTest, TailableBasic) { BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true}"); std::vector<RemoteCursor> cursors; @@ -1256,6 +1310,110 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { ASSERT_TRUE(arm->ready()); } +TEST_F(AsyncResultsMergerTest, MaxTimeMSExpiredAllowPartialResultsTrue) { + BSONObj findCmd = BSON("find" + << "testcoll" + << "allowPartialResults" << true); + std::vector<RemoteCursor> cursors; + // Two shards. + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // First host returns single result. + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; + responses.emplace_back(kTestNss, CursorId(0), batch); + scheduleNetworkResponses(std::move(responses)); + + // From the second host we get a MaxTimeMSExpired error. + scheduleErrorResponse({ErrorCodes::MaxTimeMSExpired, "MaxTimeMSExpired"}); + + executor()->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(arm->partialResultsReturned()); + ASSERT_TRUE(arm->remotesExhausted()); + ASSERT_TRUE(arm->ready()); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); + ASSERT_TRUE(unittest::assertGet(arm->nextReady()).isEOF()); +} + +TEST_F(AsyncResultsMergerTest, MaxTimeMSExpiredAllowPartialResultsFalse) { + BSONObj findCmd = BSON("find" + << "testcoll" + << "allowPartialResults" << false); + std::vector<RemoteCursor> cursors; + // two shards + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // First host returns single result + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; + responses.emplace_back(kTestNss, CursorId(0), batch); + scheduleNetworkResponses(std::move(responses)); + + // From the second host we get a MaxTimeMSExpired error. + scheduleErrorResponse({ErrorCodes::MaxTimeMSExpired, "MaxTimeMSExpired"}); + + executor()->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + auto statusWithNext = arm->nextReady(); + ASSERT(!statusWithNext.isOK()); + ASSERT_EQ(statusWithNext.getStatus().code(), ErrorCodes::MaxTimeMSExpired); + // Required to kill the 'arm' on error before destruction. + auto killFuture = arm->kill(operationContext()); + killFuture.wait(); +} + +TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnMaxTimeMSExpiredThenLateData) { + BSONObj findCmd = fromjson("{find: 'testcoll', allowPartialResults: true}"); + std::vector<RemoteCursor> cursors; + // two shards + cursors.push_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + cursors.push_back( + makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(kTestNss, 2, {}))); + auto arm = makeARMFromExistingCursors(std::move(cursors), findCmd); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // From the first host we get a MaxTimeMSExpired error. + scheduleErrorResponse({ErrorCodes::MaxTimeMSExpired, "MaxTimeMSExpired"}); + + // Second host returns single result *after* first host times out. + std::vector<CursorResponse> responses; + std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; + responses.emplace_back(kTestNss, CursorId(0), batch); + scheduleNetworkResponses(std::move(responses)); + + executor()->waitForEvent(readyEvent); + + ASSERT_TRUE(arm->ready()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_TRUE(arm->partialResultsReturned()); + ASSERT_TRUE(arm->remotesExhausted()); + ASSERT_TRUE(arm->ready()); +} + TEST_F(AsyncResultsMergerTest, ReturnsErrorOnRetriableError) { BSONObj findCmd = fromjson("{find: 'testcoll', sort: {_id: 1}}"); std::vector<RemoteCursor> cursors; diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp index 15e37b0460d..5871d49d4ac 100644 --- a/src/mongo/s/query/blocking_results_merger_test.cpp +++ b/src/mongo/s/query/blocking_results_merger_test.cpp @@ -196,6 +196,10 @@ TEST_F(ResultsMergerTestFixture, ShouldBeInterruptibleDuringBlockingNext) { ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted); }); + // Sleep to allow the blockingMerger.next() thread to started. If the thread is not started + // even after the sleep, then it's the same test as ShouldBeInterruptibleBeforeBlockingNext. + sleepmillis(10); + // Now mark the OperationContext as killed from this thread. { stdx::lock_guard<Client> lk(*operationContext()->getClient()); @@ -220,6 +224,33 @@ TEST_F(ResultsMergerTestFixture, ShouldBeInterruptibleDuringBlockingNext) { future.default_timed_get(); } +TEST_F(ResultsMergerTestFixture, ShouldBeInterruptibleBeforeBlockingNext) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto params = makeARMParamsFromExistingCursors(std::move(cursors)); + BlockingResultsMerger blockingMerger( + operationContext(), std::move(params), executor(), nullptr); + + // Mark the OperationContext as killed from this thread. + { + stdx::lock_guard<Client> lk(*operationContext()->getClient()); + operationContext()->markKilled(ErrorCodes::Interrupted); + } + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto nextStatus = blockingMerger.next(operationContext()); + ASSERT_EQ(nextStatus.getStatus(), ErrorCodes::Interrupted); + }); + + // Wait for the merger to be interrupted. + future.default_timed_get(); + + // Kill should complete. + blockingMerger.kill(operationContext()); +} + TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenYielding) { class ThrowyResourceYielder : public ResourceYielder { public: diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index b08a807788c..444b0615d99 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -103,6 +103,7 @@ StatusWith<ClusterQueryResult> ClusterClientCursorImpl::next() { invariant(_opCtx); const auto interruptStatus = _opCtx->checkForInterruptNoAssert(); if (!interruptStatus.isOK()) { + _maxTimeMSExpired |= (interruptStatus.code() == ErrorCodes::MaxTimeMSExpired); return interruptStatus; } @@ -156,7 +157,8 @@ const PrivilegeVector& ClusterClientCursorImpl::getOriginatingPrivileges() const } bool ClusterClientCursorImpl::partialResultsReturned() const { - return _root->partialResultsReturned(); + // We may have timed out in this layer, or within the plan tree waiting for results from shards. + return (_maxTimeMSExpired && _params.isAllowPartialResults) || _root->partialResultsReturned(); } std::size_t ClusterClientCursorImpl::getNumRemotes() const { diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 2529254cfce..4020918761c 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -177,6 +177,9 @@ private: // The number of batches returned by this cursor. std::uint64_t _nBatchesReturned = 0; + + // Whether ClusterClientCursor::next() was interrupted due to MaxTimeMSExpired. + bool _maxTimeMSExpired = false; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 8c3ba12e097..77d9a818f7b 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -314,13 +314,10 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, establishCursorsOnShards({cm.dbPrimary()}); MONGO_UNREACHABLE; } - throw; } - // Determine whether the cursor we may eventually register will be single- or multi-target. - const auto cursorType = params.remotes.size() > 1 ? ClusterCursorManager::CursorType::MultiTarget : ClusterCursorManager::CursorType::SingleTarget; @@ -335,7 +332,6 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, } // Transfer the established cursors to a ClusterClientCursor. - auto ccc = ClusterClientCursorImpl::make( opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); @@ -343,16 +339,66 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, FindCommon::waitInFindBeforeMakingBatch(opCtx, query); + // If we're allowing partial results and we got MaxTimeMSExpired, then temporarily disable + // interrupts in the opCtx so that we can pull already-fetched data from ClusterClientCursor. + bool ignoringInterrupts = false; + if (findCommand.getAllowPartialResults() && + opCtx->checkForInterruptNoAssert().code() == ErrorCodes::MaxTimeMSExpired) { + // MaxTimeMS is expired, but perhaps remotes not have expired their requests yet. + // Wait for all remote cursors to be exhausted so that we can safely disable interrupts + // in the opCtx. We want to be sure that later calls to ccc->next() do not block on + // more data. + + // Maximum number of 1ms sleeps to wait for remote cursors to be exhausted. + constexpr int kMaxAttempts = 10; + for (int remainingAttempts = kMaxAttempts; !ccc->remotesExhausted(); remainingAttempts--) { + if (!remainingAttempts) { + LOGV2_DEBUG( + 5746900, + 0, + "MaxTimeMSExpired error was seen on the router, but partial results cannot be " + "returned because the remotes did not give the expected MaxTimeMS error within " + "kMaxAttempts."); + // Reveal the MaxTimeMSExpired error. + opCtx->checkForInterrupt(); + } + stdx::this_thread::sleep_for(stdx::chrono::milliseconds(1)); + } + + // The first MaxTimeMSExpired will have called opCtx->markKilled() so any later + // call to opCtx->checkForInterruptNoAssert() will return an error. We need to + // temporarily ignore this while we pull data from the ClusterClientCursor. + LOGV2_DEBUG( + 5746901, + 0, + "Attempting to return partial results because MaxTimeMS expired and the query set " + "AllowPartialResults. Temporarily disabling interrupts on the OperationContext " + "while partial results are pulled from the ClusterClientCursor."); + opCtx->setIgnoreInterruptsExceptForReplStateChange(true); + ignoringInterrupts = true; + } + auto cursorState = ClusterCursorManager::CursorState::NotExhausted; size_t bytesBuffered = 0; - // This loop will not result in actually calling getMore against shards, but just loading - // results from the initial batches (that were obtained while establishing cursors) into - // 'results'. + // This loop will load enough results from the shards for a full first batch. At first, these + // results come from the initial batches that were obtained when establishing cursors, but + // ClusterClientCursor::next will fetch further results if necessary. while (!FindCommon::enoughForFirstBatch(findCommand, results->size())) { - auto next = uassertStatusOK(ccc->next()); - - if (next.isEOF()) { + auto nextWithStatus = ccc->next(); + if (findCommand.getAllowPartialResults() && + (nextWithStatus.getStatus() == ErrorCodes::MaxTimeMSExpired)) { + if (ccc->remotesExhausted()) { + cursorState = ClusterCursorManager::CursorState::Exhausted; + break; + } + // Continue because there may be results waiting from other remotes. + continue; + } else { + // all error statuses besides permissible remote timeouts should be returned to the user + uassertStatusOK(nextWithStatus); + } + if (nextWithStatus.getValue().isEOF()) { // We reached end-of-stream. If the cursor is not tailable, then we mark it as // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even // when we reach end-of-stream. However, if all the remote cursors are exhausted, there @@ -363,7 +409,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, break; } - auto nextObj = *next.getResult(); + auto nextObj = *(nextWithStatus.getValue().getResult()); // If adding this object will cause us to exceed the message size limit, then we stash it // for later. @@ -378,6 +424,19 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, results->push_back(std::move(nextObj)); } + if (ignoringInterrupts) { + opCtx->setIgnoreInterruptsExceptForReplStateChange(false); + ignoringInterrupts = false; + LOGV2_DEBUG(5746902, 0, "Re-enabled interrupts on the OperationContext."); + } + + // Surface any opCtx interrupts, except ignore MaxTimeMSExpired with allowPartialResults. + auto interruptStatus = opCtx->checkForInterruptNoAssert(); + if (!(interruptStatus.code() == ErrorCodes::MaxTimeMSExpired && + findCommand.getAllowPartialResults())) { + uassertStatusOK(interruptStatus); + } + ccc->detachFromOperationContext(); if (findCommand.getSingleBatch() && !ccc->isTailable()) { @@ -390,6 +449,7 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, // If the caller wants to know whether the cursor returned partial results, set it here. if (partialResultsReturned) { + // Missing results can come either from the first batches or from the ccc's later batches. *partialResultsReturned = ccc->partialResultsReturned(); } @@ -811,6 +871,16 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, } if (!next.isOK()) { + if (next.getStatus() == ErrorCodes::MaxTimeMSExpired && + pinnedCursor.getValue()->partialResultsReturned()) { + // Break to return partial results rather than return a MaxTimeMSExpired error + cursorState = ClusterCursorManager::CursorState::Exhausted; + LOGV2_DEBUG(5746903, + 0, + "Attempting to return partial results because MaxTimeMS expired and " + "the query set AllowPartialResults."); + break; + } return next.getStatus(); } diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index e3de9821128..21219f3db84 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -270,10 +270,11 @@ void CursorEstablisher::_handleFailure(const AsyncRequestsSender::Response& resp return; } - // Retriable errors are swallowed if '_allowPartialResults' is true. Targeting shard replica - // sets can also throw FailedToSatisfyReadPreference, so we swallow it too. + // If '_allowPartialResults' is true then swallow retriable errors, maxTimeMSExpired, and + // FailedToSatisfyReadPreference errors we might get when targeting shard replica sets. bool isEligibleException = (isMongosRetriableError(status.code()) || - status.code() == ErrorCodes::FailedToSatisfyReadPreference); + status.code() == ErrorCodes::FailedToSatisfyReadPreference || + status.code() == ErrorCodes::MaxTimeMSExpired); if (_allowPartialResults && isEligibleException) { // This exception is eligible to be swallowed. Add an entry with a cursorID of 0, an // empty HostAndPort, and which has the 'partialResultsReturned' flag set to true. diff --git a/src/mongo/s/query/establish_cursors_test.cpp b/src/mongo/s/query/establish_cursors_test.cpp index 1c73abf55a2..9e18a623034 100644 --- a/src/mongo/s/query/establish_cursors_test.cpp +++ b/src/mongo/s/query/establish_cursors_test.cpp @@ -692,6 +692,44 @@ TEST_F(EstablishCursorsTest, MultipleRemotesOneRemoteMaxesOutRetriableErrorsAllo future.default_timed_get(); } +TEST_F(EstablishCursorsTest, MultipleRemotesAllMaxOutRetriableErrorsAllowPartialResults) { + BSONObj cmdObj = fromjson("{find: 'testcoll'}"); + std::vector<std::pair<ShardId, BSONObj>> remotes{ + {kTestShardIds[0], cmdObj}, {kTestShardIds[1], cmdObj}, {kTestShardIds[2], cmdObj}}; + + // Failure to establish a cursor due to maxing out retriable errors on all three remotes + // returns an error, despite allowPartialResults being true. + auto future = launchAsync([&] { + auto cursors = establishCursors(operationContext(), + executor(), + _nss, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + remotes, + true); // allowPartialResults + // allowPartialResults is true so ignore the fact that all remotes will haved failed + // to establish a cursor due to maxing out retriable errors. The cursor entry + // is marked as 'partialResultReturned:true', with a CursorId of 0 and no HostAndPort. + ASSERT_EQ(remotes.size(), cursors.size()); + for (auto&& cursor : cursors) { + ASSERT(cursor.getHostAndPort().empty()); + ASSERT(cursor.getCursorResponse().getPartialResultsReturned()); + ASSERT_EQ(cursor.getCursorResponse().getCursorId(), CursorId{0}); + } + }); + + // All remotes always respond with retriable errors. + for (auto it = remotes.begin(); it != remotes.end(); ++it) { + for (int i = 0; i < kMaxRetries + 1; ++i) { + onCommand([&](const RemoteCommandRequest& request) { + ASSERT_EQ(_nss.coll(), request.cmdObj.firstElement().valueStringData()); + return Status(ErrorCodes::HostUnreachable, "host unreachable"); + }); + } + } + + future.default_timed_get(); +} + TEST_F(EstablishCursorsTest, InterruptedWithDanglingRemoteRequest) { BSONObj cmdObj = fromjson("{find: 'testcoll'}"); std::vector<std::pair<ShardId, BSONObj>> remotes{ |