diff options
author | Bernard Gorman <bernard.gorman@mongodb.com> | 2019-12-10 03:12:43 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-10 03:12:43 +0000 |
commit | fa76bfb04a062ce031e4d531b9e50a27e1bc2f76 (patch) | |
tree | eed06b5ba5aaf5d7da3199b90ab35ed2bd6c64a7 | |
parent | a822210dd9c28d62e5ce038b214206eb17d1a9bb (diff) | |
download | mongo-fa76bfb04a062ce031e4d531b9e50a27e1bc2f76.tar.gz |
SERVER-44977 Allow $changeStream with updateLookup to run directly against a shard mongoD
-rw-r--r-- | jstests/sharding/change_stream_against_shard_mongod.js | 47 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.cpp | 8 |
2 files changed, 54 insertions, 1 deletions
diff --git a/jstests/sharding/change_stream_against_shard_mongod.js b/jstests/sharding/change_stream_against_shard_mongod.js new file mode 100644 index 00000000000..f1591479819 --- /dev/null +++ b/jstests/sharding/change_stream_against_shard_mongod.js @@ -0,0 +1,47 @@ +/** + * Tests that an updateLookup change stream on a sharded collection can be successfully opened + * and read from on a shard mongoD. Exercises the fix for SERVER-44977. + * @tags: [uses_change_streams, requires_find_command, requires_fcv_44] + */ +(function() { +"use strict"; + +// Start a new sharded cluster and obtain references to the test DB and collection. +const st = new ShardingTest({ + shards: 1, + mongos: 1, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +const mongosDB = st.s.getDB(jsTestName()); +const mongosColl = mongosDB.test; + +const shard0 = st.rs0; +const shard0Coll = shard0.getPrimary().getCollection(mongosColl.getFullName()); + +// Enable sharding on the the test database and ensure that the primary is shard0. +assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); +st.ensurePrimaryShard(mongosDB.getName(), shard0.getURL()); + +// Shard the source collection on {a: 1}. No need to split since it's single-shard. +st.shardColl(mongosColl, {a: 1}, false); + +// Open an updateLookup change stream on the collection, against the shard mongoD. +const csCursor = shard0Coll.watch([], {fullDocument: "updateLookup"}); + +// Write one document onto shard0, then do an op-style update which will require a lookup. +assert.commandWorked(mongosColl.insert({_id: 0, a: -100})); +assert.commandWorked(mongosColl.update({a: -100}, {$set: {updated: true}})); + +// Confirm that the stream opened against the shard mongoD sees both events. +const expectedEvents = + [{op: "insert", doc: {_id: 0, a: -100}}, {op: "update", doc: {_id: 0, a: -100, updated: true}}]; +for (let event of expectedEvents) { + assert.soon(() => csCursor.hasNext()); + const nextDoc = csCursor.next(); + assert.eq(nextDoc.operationType, event.op); + assert.docEq(nextDoc.fullDocument, event.doc); +} + +st.stop(); +})();
\ No newline at end of file diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 548add75062..5ba3a9b38c9 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -477,7 +477,13 @@ boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument( nss, collectionUUID, _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); - pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); + // When looking up on a mongoD, we only ever want to read from the local collection. By + // default, makePipeline will attach a cursor source which may read from remote if the + // collection is sharded, so we manually attach a local-only cursor source here. + MakePipelineOptions opts; + opts.attachCursorSource = false; + pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx, opts); + pipeline = attachCursorSourceToPipelineForLocalRead(foreignExpCtx, pipeline.release()); } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return boost::none; } |