summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@mongodb.com>2019-12-10 03:12:43 +0000
committerevergreen <evergreen@mongodb.com>2019-12-10 03:12:43 +0000
commitfa76bfb04a062ce031e4d531b9e50a27e1bc2f76 (patch)
treeeed06b5ba5aaf5d7da3199b90ab35ed2bd6c64a7
parenta822210dd9c28d62e5ce038b214206eb17d1a9bb (diff)
downloadmongo-fa76bfb04a062ce031e4d531b9e50a27e1bc2f76.tar.gz
SERVER-44977 Allow $changeStream with updateLookup to run directly against a shard mongoD
-rw-r--r--jstests/sharding/change_stream_against_shard_mongod.js47
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp8
2 files changed, 54 insertions, 1 deletions
diff --git a/jstests/sharding/change_stream_against_shard_mongod.js b/jstests/sharding/change_stream_against_shard_mongod.js
new file mode 100644
index 00000000000..f1591479819
--- /dev/null
+++ b/jstests/sharding/change_stream_against_shard_mongod.js
@@ -0,0 +1,47 @@
+/**
+ * Tests that an updateLookup change stream on a sharded collection can be successfully opened
+ * and read from on a shard mongoD. Exercises the fix for SERVER-44977.
+ * @tags: [uses_change_streams, requires_find_command, requires_fcv_44]
+ */
+(function() {
+"use strict";
+
+// Start a new sharded cluster and obtain references to the test DB and collection.
+const st = new ShardingTest({
+ shards: 1,
+ mongos: 1,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+const mongosDB = st.s.getDB(jsTestName());
+const mongosColl = mongosDB.test;
+
+const shard0 = st.rs0;
+const shard0Coll = shard0.getPrimary().getCollection(mongosColl.getFullName());
+
+// Enable sharding on the the test database and ensure that the primary is shard0.
+assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+st.ensurePrimaryShard(mongosDB.getName(), shard0.getURL());
+
+// Shard the source collection on {a: 1}. No need to split since it's single-shard.
+st.shardColl(mongosColl, {a: 1}, false);
+
+// Open an updateLookup change stream on the collection, against the shard mongoD.
+const csCursor = shard0Coll.watch([], {fullDocument: "updateLookup"});
+
+// Write one document onto shard0, then do an op-style update which will require a lookup.
+assert.commandWorked(mongosColl.insert({_id: 0, a: -100}));
+assert.commandWorked(mongosColl.update({a: -100}, {$set: {updated: true}}));
+
+// Confirm that the stream opened against the shard mongoD sees both events.
+const expectedEvents =
+ [{op: "insert", doc: {_id: 0, a: -100}}, {op: "update", doc: {_id: 0, a: -100, updated: true}}];
+for (let event of expectedEvents) {
+ assert.soon(() => csCursor.hasNext());
+ const nextDoc = csCursor.next();
+ assert.eq(nextDoc.operationType, event.op);
+ assert.docEq(nextDoc.fullDocument, event.doc);
+}
+
+st.stop();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 548add75062..5ba3a9b38c9 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -477,7 +477,13 @@ boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument(
nss,
collectionUUID,
_getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID));
- pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx);
+ // When looking up on a mongoD, we only ever want to read from the local collection. By
+ // default, makePipeline will attach a cursor source which may read from remote if the
+ // collection is sharded, so we manually attach a local-only cursor source here.
+ MakePipelineOptions opts;
+ opts.attachCursorSource = false;
+ pipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx, opts);
+ pipeline = attachCursorSourceToPipelineForLocalRead(foreignExpCtx, pipeline.release());
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return boost::none;
}