diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-04-13 15:13:36 -0400 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2018-08-22 06:10:33 +0100 |
commit | e1f4285c2f918c70e52ac99e5285d1179c04d2a0 (patch) | |
tree | 83d67c507d5570cae5611c5489463e6d9853f75a | |
parent | eca2a99916c7fd7c74f8efbb836d5d2458d7a450 (diff) | |
download | mongo-e1f4285c2f918c70e52ac99e5285d1179c04d2a0.tar.gz |
SERVER-34204 Always pass non-null opCtx when scheduling getMores in ARM
(cherry picked from commit a43fe9ae73752fbd98107cef5421341fe291ab32)
-rw-r--r-- | buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml | 72 | ||||
-rw-r--r-- | etc/evergreen.yml | 20 | ||||
-rw-r--r-- | jstests/change_streams/only_wake_getmore_for_relevant_changes.js | 65 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 155 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 50 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 24 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 229 | ||||
-rw-r--r-- | src/mongo/s/query/router_stage_merge.cpp | 12 | ||||
-rw-r--r-- | src/mongo/util/concurrency/old_thread_pool.cpp | 2 | ||||
-rw-r--r-- | src/mongo/util/concurrency/old_thread_pool.h | 1 |
12 files changed, 360 insertions, 328 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml new file mode 100644 index 00000000000..10d8f35cca4 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml @@ -0,0 +1,72 @@ +test_kind: js_test + +selector: + roots: + - jstests/change_streams/**/*.js + exclude_files: + # This test exercises an internal detail of mongos<->mongod communication and is not expected to + # work against a mongos. + - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO: SERVER-32088 should fix resuming a change stream when not all shards have data. + - jstests/change_streams/change_stream_shell_helper.js + - jstests/change_streams/lookup_post_image.js + exclude_with_any_tags: + ## + # The next three tags correspond to the special errors thrown by the + # set_read_and_write_concerns.js override when it refuses to replace the readConcern or + # writeConcern of a particular command. Above each tag are the message(s) that cause the tag to be + # warranted. + ## + # "Cowardly refusing to override read concern of command: ..." + - assumes_read_concern_unchanged + # "Cowardly refusing to override write concern of command: ..." + - assumes_write_concern_unchanged + # "Cowardly refusing to run test with overridden write concern when it uses a command that can + # only perform w=1 writes: ..." + - requires_eval_command + # Transactions not supported on sharded clusters. + - uses_transactions + +executor: + archive: + hooks: + - CheckReplDBHash + - ValidateCollections + config: + shell_options: + global_vars: + TestData: + defaultReadConcernLevel: majority + enableMajorityReadConcern: '' + eval: >- + var testingReplication = true; + load('jstests/libs/override_methods/set_read_and_write_concerns.js'); + load('jstests/libs/override_methods/enable_sessions.js'); + readMode: commands + hooks: + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ShardedClusterFixture + # Use two shards to make sure we will only talk to the primary shard for the database and will + # not delay changes to wait for notifications or a clock advancement from other shards. + num_shards: 2 + mongos_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + mongod_options: + bind_ip_all: '' + nopreallocj: '' + set_parameters: + enableTestCommands: 1 + numInitialSyncAttempts: 1 + periodicNoopIntervalSecs: 1 + writePeriodicNoops: true + num_rs_nodes_per_shard: 1 + # This test suite doesn't actually shard any collections, but enabling sharding will prevent + # read commands against non-existent databases from unconditionally returning a CursorId of 0. + enable_sharding: + - test diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 543c2415862..d9b4f9bb51e 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -3410,6 +3410,17 @@ tasks: run_multiple_jobs: true - <<: *task_template + name: change_streams_mongos_sessions_passthrough_WT + depends_on: + - name: change_streams_WT + commands: + - func: "do setup" + - func: "run tests" + vars: + resmoke_args: --suites=change_streams_mongos_sessions_passthrough --storageEngine=wiredTiger + run_multiple_jobs: true + +- <<: *task_template name: change_streams_mongos_passthrough_WT depends_on: - name: change_streams_WT @@ -5502,6 +5513,7 @@ buildvariants: - name: auth_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_mongos_sessions_passthrough_WT - name: change_streams_secondary_reads_WT - name: change_streams_sharded_collections_passthrough_WT - name: dbtest_WT @@ -5628,6 +5640,7 @@ buildvariants: - name: causally_consistent_jscore_passthrough_auth_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_mongos_sessions_passthrough_WT - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT - name: concurrency_replication_WT @@ -6447,6 +6460,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_mongos_sessions_passthrough_WT - name: change_streams_sharded_collections_passthrough_WT - name: dbtest_WT - name: disk_WT @@ -7002,6 +7016,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_mongos_sessions_passthrough_WT - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT - name: concurrency_replication_WT @@ -7212,6 +7227,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_mongos_sessions_passthrough_WT - name: change_streams_secondary_reads_WT - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT @@ -7576,6 +7592,7 @@ buildvariants: - name: bulk_gle_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_mongos_sessions_passthrough_WT - name: change_streams_secondary_reads_WT - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT @@ -9018,6 +9035,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_mongos_sessions_passthrough_WT - name: change_streams_sharded_collections_passthrough_WT - name: concurrency distros: @@ -9399,6 +9417,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_mongos_sessions_passthrough_WT - name: change_streams_secondary_reads_WT - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT @@ -9557,6 +9576,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_mongos_sessions_passthrough_WT - name: change_streams_secondary_reads_WT - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT diff --git a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js index 84f4d0ae979..8a42a176628 100644 --- a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js +++ b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js @@ -21,38 +21,41 @@ // the getMore to run. To prevent this from happening, the main thread waits for an insert // into "sentinel", to signal that the parallel shell has started and is waiting for the // getMore to appear in currentOp. - const shellSentinelCollection = assertDropAndRecreateCollection(db, "shell_sentinel"); - const port = (collection.stats().sharded ? collection.getMongo().port : FixtureHelpers.getPrimaryForNodeHostingDatabase(db).port); - const awaitShellDoingEventDuringGetMore = startParallelShell(` -// Signal that the parallel shell has started. -assert.writeOK(db.getCollection("${ shellSentinelCollection.getName() }").insert({})); - -// Wait for the getMore to appear in currentOp. -assert.soon(function() { - return db.currentOp({ - op: "getmore", - "command.collection": "${collection.getName()}", - "originatingCommand.comment": "${identifyingComment}", - }).inprog.length > 0; -}); + const sentinelCountBefore = shellSentinelCollection.find().itcount(); -const eventFn = ${ event.toString() }; -eventFn();`, + const awaitShellDoingEventDuringGetMore = startParallelShell(` + // Wait for the getMore to appear in currentOp. + assert.soon(function() { + return db.currentOp({ + op: "getmore", + "originatingCommand.comment": "${identifyingComment}", + }).inprog.length > 0; + }); + const eventFn = ${event.toString()}; + eventFn(); + //Signal that the parallel shell has completed its event function. + assert.writeOK(db.getCollection("${shellSentinelCollection.getName()}").insert({}));`, port); - // Wait for the shell to start. - assert.soon(() => shellSentinelCollection.findOne() != null); - // Run and time the getMore. - const startTime = (new Date()).getTime(); - const result = assert.commandWorked(db.runCommand( - {getMore: awaitDataCursorId, collection: collection.getName(), maxTimeMS: maxTimeMS})); + let startTime, result, elapsedMs; + assert.soon(function() { + startTime = (new Date()).getTime(); + result = assert.commandWorked(db.runCommand({ + getMore: awaitDataCursorId, + collection: collection.getName(), + maxTimeMS: maxTimeMS + })); + elapsedMs = (new Date()).getTime() - startTime; + return result.cursor.nextBatch.length > 0 || + shellSentinelCollection.find().itcount() > sentinelCountBefore; + }); awaitShellDoingEventDuringGetMore(); - return {result: result, elapsedMs: (new Date()).getTime() - startTime}; + return {result: result, elapsedMs: elapsedMs}; } /** @@ -107,7 +110,10 @@ eventFn();`, return result; } + // Refresh all collections which will be required in the course of this test. + const shellSentinelCollection = assertDropAndRecreateCollection(db, "shell_sentinel"); const changesCollection = assertDropAndRecreateCollection(db, "changes"); + const unrelatedCollection = assertDropCollection(db, "unrelated_collection"); // Start a change stream cursor. const wholeCollectionStreamComment = "change stream on entire collection"; @@ -131,16 +137,15 @@ eventFn();`, event: () => assert.writeOK(db.changes.insert({_id: "wake up"})) }); assert.eq(getMoreResponse.cursor.nextBatch.length, 1); - assert.docEq(getMoreResponse.cursor.nextBatch[0], { - documentKey: {_id: "wake up"}, - fullDocument: {_id: "wake up"}, - ns: {db: db.getName(), coll: changesCollection.getName()}, - operationType: "insert" - }); + assert.eq(getMoreResponse.cursor.nextBatch[0].operationType, + "insert", + tojson(getMoreResponse.cursor.nextBatch[0])); + assert.eq(getMoreResponse.cursor.nextBatch[0].fullDocument, + {_id: "wake up"}, + tojson(getMoreResponse.cursor.nextBatch[0])); // Test that an insert to an unrelated collection will not cause the change stream to wake up // and return an empty batch before reaching the maxTimeMS. - assertDropCollection(db, "unrelated_collection"); assertEventDoesNotWakeCursor({ collection: changesCollection, awaitDataCursorId: changeCursorId, diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index fa418d338fb..2ae3df0e8f5 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -104,14 +104,33 @@ BSONObj BaseClonerTest::createListIndexesResponse(CursorId cursorId, const BSONA return createListIndexesResponse(cursorId, specs, "firstBatch"); } +namespace { +struct EnsureClientHasBeenInitialized : public executor::ThreadPoolMock::Options { + EnsureClientHasBeenInitialized() : executor::ThreadPoolMock::Options() { + onCreateThread = []() { Client::initThread("CollectionClonerTestThread"); }; + } +}; +} // namespace + BaseClonerTest::BaseClonerTest() - : _mutex(), _setStatusCondition(), _status(getDetectableErrorStatus()) {} + : ThreadPoolExecutorTest(EnsureClientHasBeenInitialized()), + _mutex(), + _setStatusCondition(), + _status(getDetectableErrorStatus()) {} void BaseClonerTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); clear(); launchExecutorThread(); - dbWorkThreadPool = stdx::make_unique<OldThreadPool>(1); + + Client::initThread("CollectionClonerTest"); + ThreadPool::Options options; + options.minThreads = 1U; + options.maxThreads = 1U; + options.onCreateThread = [](StringData threadName) { Client::initThread(threadName); }; + dbWorkThreadPool = stdx::make_unique<OldThreadPool>(options); + dbWorkThreadPool->startThreads(); + storageInterface.reset(new StorageInterfaceMock()); } @@ -122,6 +141,7 @@ void BaseClonerTest::tearDown() { storageInterface.reset(); dbWorkThreadPool->join(); dbWorkThreadPool.reset(); + Client::releaseCurrent(); } void BaseClonerTest::clear() { diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index ffcbf7fafb0..1d26394e113 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -246,8 +246,17 @@ void CollectionCloner::shutdown() { void CollectionCloner::_cancelRemainingWork_inlock() { if (_arm) { - Client::initThreadIfNotAlready(); - _killArmHandle = _arm->kill(cc().getOperationContext()); + // This method can be called from a callback from either a TaskExecutor or a TaskRunner. The + // TaskExecutor should never have an OperationContext attached to the Client, and the + // TaskRunner should always have an OperationContext attached. Unfortunately, we don't know + // which situation we're in, so have to handle both. + auto& client = cc(); + if (auto opCtx = client.getOperationContext()) { + _killArmHandle = _arm->kill(opCtx); + } else { + auto newOpCtx = client.makeOperationContext(); + _killArmHandle = _arm->kill(newOpCtx.get()); + } } _countScheduler.shutdown(); _listIndexesFetcher.shutdown(); @@ -650,9 +659,12 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa _clusterClientCursorParams->remotes = std::move(remoteCursors); if (_collectionCloningBatchSize > 0) _clusterClientCursorParams->batchSize = _collectionCloningBatchSize; - Client::initThreadIfNotAlready(); + // Client::initThreadIfNotAlready(); + auto opCtx = cc().makeOperationContext(); _arm = stdx::make_unique<AsyncResultsMerger>( - cc().getOperationContext(), _executor, _clusterClientCursorParams.get()); + opCtx.get(), _executor, _clusterClientCursorParams.get()); + _arm->detachFromOperationContext(); + opCtx.reset(); // This completion guard invokes _finishCallback on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; @@ -665,7 +677,6 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa // outside the mutex. This is a necessary condition to invoke _finishCallback. stdx::lock_guard<stdx::mutex> lock(_mutex); Status scheduleStatus = _scheduleNextARMResultsCallback(onCompletionGuard); - _arm->detachFromOperationContext(); if (!scheduleStatus.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, scheduleStatus); return; @@ -689,9 +700,10 @@ StatusWith<std::vector<BSONElement>> CollectionCloner::_parseParallelCollectionS } Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { - Client::initThreadIfNotAlready(); - auto opCtx = cc().getOperationContext(); - _arm->reattachToOperationContext(opCtx); + // We expect this callback to execute in a thread from a TaskExecutor which will not have an + // OperationContext populated. We must make one ourselves. + auto opCtx = cc().makeOperationContext(); + _arm->reattachToOperationContext(opCtx.get()); while (_arm->ready()) { auto armResultStatus = _arm->nextReady(); if (!armResultStatus.getStatus().isOK()) { @@ -712,8 +724,10 @@ Status CollectionCloner::_bufferNextBatchFromArm(WithLock lock) { Status CollectionCloner::_scheduleNextARMResultsCallback( std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - Client::initThreadIfNotAlready(); - _arm->reattachToOperationContext(cc().getOperationContext()); + // We expect this callback to execute in a thread from a TaskExecutor which will not have an + // OperationContext populated. We must make one ourselves. + auto opCtx = cc().makeOperationContext(); + _arm->reattachToOperationContext(opCtx.get()); auto nextEvent = _arm->nextEvent(); _arm->detachFromOperationContext(); if (!nextEvent.isOK()) { diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 32c8ea3e568..7d9666d2907 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -1129,68 +1129,6 @@ TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) { ASSERT_FALSE(collectionCloner->isActive()); } -TEST_F(CollectionClonerTest, MiddleBatchContainsNoDocuments) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(1, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(1, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc2 = BSON("_id" << 2); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2))); - } - - collectionCloner->join(); - - ASSERT_EQUALS(2, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - TEST_F(CollectionClonerTest, CollectionClonerTransitionsToCompleteIfShutdownBeforeStartup) { collectionCloner->shutdown(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, collectionCloner->startup()); @@ -1969,99 +1907,6 @@ TEST_F(ParallelCollectionClonerTest, InsertDocumentsWithMultipleCursorsOfDiffere ASSERT_FALSE(collectionCloner->isActive()); } -TEST_F(ParallelCollectionClonerTest, MiddleBatchContainsNoDocumentsWithMultipleCursors) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("cursors" << BSON_ARRAY(createCursorResponse(1, emptyArray) - << createCursorResponse(2, emptyArray) - << createCursorResponse(3, emptyArray)) - << "ok" - << 1)); - } - collectionCloner->waitForDbWorker(); - - auto exec = &getExecutor(); - std::vector<BSONObj> docs; - // Record the buffered documents before they are inserted so we can - // validate them. - collectionCloner->setScheduleDbWorkFn_forTest( - [&](const executor::TaskExecutor::CallbackFn& workFn) { - auto buffered = collectionCloner->getDocumentsToInsert_forTest(); - docs.insert(docs.end(), buffered.begin(), buffered.end()); - return exec->scheduleWork(workFn); - }); - - ASSERT_TRUE(collectionCloner->isActive()); - - int numDocs = 6; - std::vector<BSONObj> generatedDocs = generateDocs(numDocs); - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(generatedDocs[0]))); - processNetworkResponse(createCursorResponse(2, BSON_ARRAY(generatedDocs[1]))); - processNetworkResponse(createCursorResponse(3, BSON_ARRAY(generatedDocs[2]))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3U, docs.size()); - for (int i = 0; i < 3; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - ASSERT_EQUALS(3, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray, "nextBatch")); - processNetworkResponse(createCursorResponse(2, emptyArray, "nextBatch")); - processNetworkResponse(createCursorResponse(3, emptyArray, "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(3, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[3]))); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[4]))); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(generatedDocs[5]))); - } - - collectionCloner->join(); - - ASSERT_EQUALS(6U, docs.size()); - for (int i = 3; i < 6; i++) { - ASSERT_BSONOBJ_EQ(generatedDocs[i], docs[i]); - } - - ASSERT_EQUALS(numDocs, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - TEST_F(ParallelCollectionClonerTest, LastBatchContainsNoDocumentsWithMultipleCursors) { ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 241f09f209b..b9ef1290fa6 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -326,6 +326,7 @@ ClusterQueryResult AsyncResultsMerger::_nextReadyUnsorted(WithLock) { } Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { + invariant(_opCtx, "Cannot schedule a getMore without an OperationContext"); auto& remote = _remotes[remoteIndex]; invariant(!remote.cbHandle.isValid()); @@ -364,6 +365,32 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { return Status::OK(); } +Status AsyncResultsMerger::scheduleGetMores() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _scheduleGetMores(lk); +} + +Status AsyncResultsMerger::_scheduleGetMores(WithLock lk) { + // Schedule remote work on hosts for which we need more results. + for (size_t i = 0; i < _remotes.size(); ++i) { + auto& remote = _remotes[i]; + + if (!remote.status.isOK()) { + return remote.status; + } + + if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) { + // If this remote is not exhausted and there is no outstanding request for it, schedule + // work to retrieve the next batch. + auto nextBatchStatus = _askForNextBatch(lk, i); + if (!nextBatchStatus.isOK()) { + return nextBatchStatus; + } + } + } + return Status::OK(); +} + /* * Note: When nextEvent() is called to do retries, only the remotes with retriable errors will * be rescheduled because: @@ -388,22 +415,9 @@ StatusWith<executor::TaskExecutor::EventHandle> AsyncResultsMerger::nextEvent() "nextEvent() called before an outstanding event was signaled"); } - // Schedule remote work on hosts for which we need more results. - for (size_t i = 0; i < _remotes.size(); ++i) { - auto& remote = _remotes[i]; - - if (!remote.status.isOK()) { - return remote.status; - } - - if (!remote.hasNext() && !remote.exhausted() && !remote.cbHandle.isValid()) { - // If this remote is not exhausted and there is no outstanding request for it, schedule - // work to retrieve the next batch. - auto nextBatchStatus = _askForNextBatch(lk, i); - if (!nextBatchStatus.isOK()) { - return nextBatchStatus; - } - } + auto getMoresStatus = _scheduleGetMores(lk); + if (!getMoresStatus.isOK()) { + return getMoresStatus; } auto eventStatus = _executor->makeEvent(); @@ -570,9 +584,11 @@ void AsyncResultsMerger::_processBatchResults(WithLock lk, if (_params->tailableMode == TailableMode::kTailable && !remote.hasNext()) { invariant(_remotes.size() == 1); _eofNext = true; - } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive) { + } else if (!remote.hasNext() && !remote.exhausted() && _lifecycleState == kAlive && _opCtx) { // If this is normal or tailable-awaitData cursor and we still don't have anything buffered // after receiving this batch, we can schedule work to retrieve the next batch right away. + // Be careful only to do this when '_opCtx' is non-null, since it is illegal to schedule a + // remote command on a user's behalf without a non-null OperationContext. remote.status = _askForNextBatch(lk, remoteIndex); } } diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index da21593486c..b48e5004a56 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -179,6 +179,24 @@ public: StatusWith<executor::TaskExecutor::EventHandle> nextEvent(); /** + * Schedules a getMore on any remote hosts which: + * - Do not have an error status set already. + * - Don't already have a request outstanding. + * - We don't currently have any results buffered. + * - Are not exhausted (have a non-zero cursor id). + * Returns an error if any of the remotes responded with an error, or if we encounter an error + * while scheduling the getMore requests.. + * + * In most cases users should call nextEvent() instead of this method, but this can be necessary + * if the caller of nextEvent() calls detachFromOperationContext() before the event is signaled. + * In such cases, the ARM cannot schedule getMores itself, and will need to be manually prompted + * after calling reattachToOperationContext(). + * + * It is illegal to call this method if the ARM is not attached to an OperationContext. + */ + Status scheduleGetMores(); + + /** * Adds the specified shard cursors to the set of cursors to be merged. The results from the * new cursors will be returned as normal through nextReady(). */ @@ -368,6 +386,12 @@ private: */ bool _haveOutstandingBatchRequests(WithLock); + + /** + * Schedules a getMore on any remote hosts which we need another batch from. + */ + Status _scheduleGetMores(WithLock); + /** * Schedules a killCursors command to be run on all remote hosts that have open cursors. */ diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 8155c50237c..8c82fbdbb0c 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -43,6 +43,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/sharding_test_fixture.h" #include "mongo/stdx/memory.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -62,6 +63,10 @@ const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", HostAndPort("FakeShard2Host", 12345), HostAndPort("FakeShard3Host", 12345)}; +LogicalSessionId parseSessionIdFromCmd(BSONObj cmdObj) { + return LogicalSessionId::parse(IDLParserErrorContext("lsid"), cmdObj["lsid"].Obj()); +} + class AsyncResultsMergerTest : public ShardingTestFixture { public: AsyncResultsMergerTest() : _nss("testdb.testcoll") {} @@ -155,11 +160,12 @@ protected: /** * Schedules a list of cursor responses to be returned by the mock network. */ - void scheduleNetworkResponses(std::vector<CursorResponse> responses, - CursorResponse::ResponseType responseType) { + void scheduleNetworkResponses(std::vector<CursorResponse> responses) { std::vector<BSONObj> objs; for (const auto& cursorResponse : responses) { - objs.push_back(cursorResponse.toBSON(responseType)); + // For tests of the AsyncResultsMerger, all CursorRepsonses scheduled by the tests are + // subsequent responses, since the AsyncResultsMerger will only ever run getMores. + objs.push_back(cursorResponse.toBSON(CursorResponse::ResponseType::SubsequentResponse)); } scheduleNetworkResponseObjs(objs); } @@ -191,6 +197,11 @@ protected: return retRequest; } + bool networkHasReadyRequests() { + NetworkInterfaceMock::InNetworkGuard guard(network()); + return guard->hasReadyRequests(); + } + void scheduleErrorResponse(ResponseStatus rs) { invariant(!rs.isOK()); rs.elapsedMillis = Milliseconds(0); @@ -243,8 +254,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardUnsorted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); @@ -287,8 +297,7 @@ TEST_F(AsyncResultsMergerTest, SingleShardSorted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")}; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); @@ -332,8 +341,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { std::vector<BSONObj> batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(0), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is ready to return first result. ASSERT_TRUE(arm->ready()); @@ -360,8 +368,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardUnsorted) { std::vector<BSONObj> batch2 = { fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(0), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is ready to return remaining results. ASSERT_TRUE(arm->ready()); @@ -404,8 +411,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { std::vector<BSONObj> batch1 = {fromjson("{$sortKey: {'': 5}}"), fromjson("{$sortKey: {'': 6}}")}; responses.emplace_back(_nss, CursorId(0), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is not ready to return results until receiving responses from all remotes. ASSERT_FALSE(arm->ready()); @@ -418,8 +424,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardSorted) { std::vector<BSONObj> batch2 = {fromjson("{$sortKey: {'': 3}}"), fromjson("{$sortKey: {'': 9}}")}; responses.emplace_back(_nss, CursorId(0), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Now that all remotes have responded, ARM is ready to return results. ASSERT_TRUE(arm->ready()); @@ -463,8 +468,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { std::vector<BSONObj> batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(5), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is ready to return first result. ASSERT_TRUE(arm->ready()); @@ -492,8 +496,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { std::vector<BSONObj> batch2 = { fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(0), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is ready to return second shard's results. ASSERT_TRUE(arm->ready()); @@ -520,8 +523,7 @@ TEST_F(AsyncResultsMergerTest, MultiShardMultipleGets) { std::vector<BSONObj> batch3 = { fromjson("{_id: 7}"), fromjson("{_id: 8}"), fromjson("{_id: 9}")}; responses.emplace_back(_nss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // ARM is ready to return remaining results. ASSERT_TRUE(arm->ready()); @@ -565,8 +567,7 @@ TEST_F(AsyncResultsMergerTest, CompoundSortKey) { std::vector<BSONObj> batch3 = {fromjson("{$sortKey: {'': 10, '': 12}}"), fromjson("{$sortKey: {'': 5, '': 9}}")}; responses.emplace_back(_nss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); // ARM returns all results in sorted order. @@ -610,8 +611,7 @@ TEST_F(AsyncResultsMergerTest, SortedButNoSortKey) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{a: 2, b: 1}"), fromjson("{a: 1, b: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -659,8 +659,7 @@ TEST_F(AsyncResultsMergerTest, HasFirstBatch) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); @@ -718,8 +717,7 @@ TEST_F(AsyncResultsMergerTest, OneShardHasInitialBatchOtherShardExhausted) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Now that the responses have been delivered, ARM is ready to return results. ASSERT_TRUE(arm->ready()); @@ -757,8 +755,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.emplace_back(_nss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(2), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -779,8 +776,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(1), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); blackHoleNextRequest(); executor()->waitForEvent(readyEvent); @@ -797,8 +793,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.clear(); std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")}; responses.emplace_back(_nss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -825,8 +820,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(456), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -883,8 +877,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { responses.emplace_back(_nss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(2), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"}); executor()->waitForEvent(readyEvent); @@ -914,8 +907,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -992,8 +984,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { responses.emplace_back(_nss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(123), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Kill should be able to return right away if there are no pending batches. auto killedEvent = arm->kill(operationContext()); @@ -1016,8 +1007,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Kill event will only be signalled once the callbacks for the pending batches have run. auto killedEvent = arm->kill(operationContext()); @@ -1026,13 +1016,11 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(2), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(3), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Ensure that we properly signal those waiting for more results to be ready. executor()->waitForEvent(readyEvent); @@ -1051,8 +1039,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); auto killedEvent = arm->kill(operationContext()); @@ -1087,8 +1074,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(123), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1108,8 +1094,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(123), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1137,8 +1122,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch; responses.emplace_back(_nss, CursorId(123), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); // After receiving an empty batch, the ARM should return boost::none, but remotes should not be @@ -1165,8 +1149,7 @@ TEST_F(AsyncResultsMergerTest, TailableExhaustedCursor) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); // Afterwards, the ARM should return boost::none and remote cursors should be marked as @@ -1190,8 +1173,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1211,8 +1193,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { ASSERT_OK(request.getStatus()); ASSERT_EQ(*request.getValue().batchSize, 1LL); ASSERT_EQ(request.getValue().cursorid, 1LL); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1241,8 +1222,7 @@ TEST_F(AsyncResultsMergerTest, SendsSecondaryOkAsMetadata) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(0), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1274,8 +1254,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.emplace_back(_nss, CursorId(98), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(99), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1295,8 +1274,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(99), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1310,8 +1288,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResults) { responses.clear(); std::vector<BSONObj> batch4 = {}; responses.emplace_back(_nss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1331,8 +1308,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsSingleNode) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(98), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1366,8 +1342,7 @@ TEST_F(AsyncResultsMergerTest, AllowPartialResultsOnRetriableErrorNoRetries) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // From the second host we get a network (retriable) error. scheduleErrorResponse({ErrorCodes::HostUnreachable, "host unreachable"}); @@ -1422,8 +1397,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(123), batch1); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1444,8 +1418,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(123), batch2); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -1465,8 +1438,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreRequestIncludesMaxTimeMS) { responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHasNoOplogTimestamp) { @@ -1491,8 +1463,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Still shouldn't be ready, we don't have a guarantee from each shard. ASSERT_FALSE(arm->ready()); @@ -1504,8 +1475,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")}; const Timestamp lastObservedSecondCursor = Timestamp(1, 5); responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ( @@ -1524,13 +1494,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneOrMoreRemotesHas responses.clear(); std::vector<BSONObj> batch3 = {}; responses.emplace_back(_nss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); responses.clear(); std::vector<BSONObj> batch4 = {}; responses.emplace_back(_nss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); } TEST_F(AsyncResultsMergerTest, @@ -1563,8 +1531,7 @@ TEST_F(AsyncResultsMergerTest, std::vector<CursorResponse> responses; std::vector<BSONObj> batch3 = {}; responses.emplace_back(_nss, CursorId(0), batch3, boost::none, Timestamp(1, 8)); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(unittest::assertGet(arm->nextEvent())); ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ( @@ -1579,8 +1546,7 @@ TEST_F(AsyncResultsMergerTest, responses.clear(); std::vector<BSONObj> batch4 = {}; responses.emplace_back(_nss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOplogTime) { @@ -1612,8 +1578,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNotReadyIfOneRemoteHasLowerOp // Clean up the cursors. std::vector<CursorResponse> responses; responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>{}); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); auto killEvent = arm->kill(operationContext()); executor()->waitForEvent(killEvent); } @@ -1639,8 +1604,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Should be ready now. ASSERT_TRUE(arm->ready()); @@ -1661,8 +1625,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) "$sortKey: {'': Timestamp(1, 5), '': 1, '': 2}}")}; const Timestamp lastObservedSecondCursor = Timestamp(1, 5); responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ( @@ -1681,13 +1644,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedAfterExisting) responses.clear(); std::vector<BSONObj> batch3 = {}; responses.emplace_back(_nss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); responses.clear(); std::vector<BSONObj> batch4 = {}; responses.emplace_back(_nss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); } TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting) { @@ -1711,8 +1672,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting "$sortKey: {'': Timestamp(1, 4), '': 1, '': 1}}")}; const Timestamp lastObservedFirstCursor = Timestamp(1, 6); responses.emplace_back(_nss, CursorId(123), batch1, boost::none, lastObservedFirstCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); // Should be ready now. ASSERT_TRUE(arm->ready()); @@ -1735,8 +1695,7 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting // from it. const Timestamp lastObservedSecondCursor = Timestamp(1, 5); responses.emplace_back(_nss, CursorId(456), batch2, boost::none, lastObservedSecondCursor); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); ASSERT_BSONOBJ_EQ( @@ -1755,13 +1714,11 @@ TEST_F(AsyncResultsMergerTest, SortedTailableCursorNewShardOrderedBeforeExisting responses.clear(); std::vector<BSONObj> batch3 = {}; responses.emplace_back(_nss, CursorId(0), batch3); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); responses.clear(); std::vector<BSONObj> batch4 = {}; responses.emplace_back(_nss, CursorId(0), batch4); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); } TEST_F(AsyncResultsMergerTest, GetMoreRequestWithoutTailableCantHaveMaxTime) { @@ -1831,14 +1788,58 @@ TEST_F(AsyncResultsMergerTest, KillShouldWaitForRemoteCommandsBeforeSchedulingKi std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(1), batch); - scheduleNetworkResponses(std::move(responses), - CursorResponse::ResponseType::SubsequentResponse); + scheduleNetworkResponses(std::move(responses)); executor()->waitForEvent(readyEvent); // Now the kill cursors command should be scheduled. executor()->waitForEvent(killEvent); } -} // namespace +TEST_F(AsyncResultsMergerTest, ShouldNotScheduleGetMoresWithoutAnOperationContext) { + BSONObj findCmd = fromjson("{find: 'testcoll', tailable: true, awaitData: true}"); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.push_back({kTestShardIds[0], kTestShardHosts[0], CursorResponse(_nss, 123, {})}); + makeCursorFromExistingCursors(std::move(cursors), findCmd); + + ASSERT_FALSE(arm->ready()); + auto readyEvent = unittest::assertGet(arm->nextEvent()); + ASSERT_FALSE(arm->ready()); + + // While detached from the OperationContext, schedule an empty batch response. Because the + // response is empty and this is a tailable cursor, the ARM will need to run another getMore on + // that host, but it should not schedule this without a non-null OperationContext. + arm->detachFromOperationContext(); + { + std::vector<CursorResponse> responses; + std::vector<BSONObj> emptyBatch; + responses.emplace_back(_nss, CursorId(123), emptyBatch); + scheduleNetworkResponses(std::move(responses)); + } + + ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(networkHasReadyRequests()); // Tests that we haven't asked for the next batch yet. + + // After manually requesting the next getMore, the ARM should be ready. + arm->reattachToOperationContext(operationContext()); + ASSERT_OK(arm->scheduleGetMores()); + + // Schedule the next getMore response. + { + std::vector<CursorResponse> responses; + std::vector<BSONObj> nonEmptyBatch = {fromjson("{_id: 1}")}; + responses.emplace_back(_nss, CursorId(123), nonEmptyBatch); + scheduleNetworkResponses(std::move(responses)); + } + ASSERT_TRUE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + ASSERT_BSONOBJ_EQ(fromjson("{_id: 1}"), *unittest::assertGet(arm->nextReady()).getResult()); + ASSERT_FALSE(arm->ready()); + ASSERT_FALSE(arm->remotesExhausted()); + + auto killedEvent = arm->kill(operationContext()); + executor()->waitForEvent(killedEvent); +} + +} // namespace } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 31c200bb004..f105c87a5da 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -102,6 +102,18 @@ StatusWith<EventHandle> RouterStageMerge::getNextEvent() { // If we abandoned a previous event due to a mongoS-side timeout, wait for it first. if (_leftoverEventFromLastTimeout) { invariant(_params->tailableMode == TailableMode::kTailableAndAwaitData); + // If we have an outstanding event from last time, then we might have to manually schedule + // some getMores for the cursors. If a remote response came back while we were between + // getMores (from the user to mongos), the response may have been an empty batch, and the + // ARM would not be able to ask for the next batch immediately since it is not attached to + // an OperationContext. Now that we have a valid OperationContext, we schedule the getMores + // ourselves. + Status getMoreStatus = _arm.scheduleGetMores(); + if (!getMoreStatus.isOK()) { + return getMoreStatus; + } + + // Return the leftover event and clear '_leftoverEventFromLastTimeout'. auto event = _leftoverEventFromLastTimeout; _leftoverEventFromLastTimeout = EventHandle(); return event; diff --git a/src/mongo/util/concurrency/old_thread_pool.cpp b/src/mongo/util/concurrency/old_thread_pool.cpp index f253d1f4b05..5ff9b61ebb9 100644 --- a/src/mongo/util/concurrency/old_thread_pool.cpp +++ b/src/mongo/util/concurrency/old_thread_pool.cpp @@ -60,6 +60,8 @@ OldThreadPool::OldThreadPool(const DoNotStartThreadsTag&, const std::string& threadNamePrefix) : _pool(makeOptions(nThreads, threadNamePrefix)) {} +OldThreadPool::OldThreadPool(ThreadPool::Options options) : _pool(std::move(options)) {} + std::size_t OldThreadPool::getNumThreads() const { return _pool.getStats().numThreads; } diff --git a/src/mongo/util/concurrency/old_thread_pool.h b/src/mongo/util/concurrency/old_thread_pool.h index 0a1f7e9887f..eed66d65f14 100644 --- a/src/mongo/util/concurrency/old_thread_pool.h +++ b/src/mongo/util/concurrency/old_thread_pool.h @@ -53,6 +53,7 @@ public: explicit OldThreadPool(const DoNotStartThreadsTag&, int nThreads = 8, const std::string& threadNamePrefix = ""); + explicit OldThreadPool(ThreadPool::Options options); std::size_t getNumThreads() const; ThreadPool::Stats getStats() const; |