summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/change_streams/change_stream.js10
-rw-r--r--jstests/change_streams/change_stream_collation.js4
-rw-r--r--jstests/noPassthrough/unsupported_change_stream_deployments.js12
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp11
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp10
5 files changed, 42 insertions, 5 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 80356aa3b48..59e8df872f1 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -6,10 +6,18 @@
load("jstests/libs/change_stream_util.js");
load('jstests/libs/uuid_util.js');
+ jsTestLog("Testing $changeStream on non-existent database");
+ const dbDoesNotExist = db.getSiblingDB("database-does-not-exist");
+ assert.commandWorked(dbDoesNotExist.dropDatabase());
+ assert.commandFailedWithCode(
+ dbDoesNotExist.runCommand(
+ {aggregate: dbDoesNotExist.getName(), pipeline: [{$changeStream: {}}], cursor: {}}),
+ ErrorCodes.NamespaceNotFound);
+
let cst = new ChangeStreamTest(db);
jsTestLog("Testing single insert");
- assertDropCollection(db, "t1");
+ assertDropAndRecreateCollection(db, "t1");
let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
// Test that if there are no changes, we return an empty batch.
assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/change_stream_collation.js
index 314e7b08954..b20d33e1981 100644
--- a/jstests/change_streams/change_stream_collation.js
+++ b/jstests/change_streams/change_stream_collation.js
@@ -11,6 +11,10 @@
const caseInsensitive = {locale: "en_US", strength: 2};
+ // $changeStream cannot run on a non-existent database. Create an unrelated collection to ensure
+ // that the database is present before testing.
+ assertDropAndRecreateCollection(db, "change_stream_ensure_db_exists");
+
let caseInsensitiveCollection = "change_stream_case_insensitive";
assertDropCollection(db, caseInsensitiveCollection);
diff --git a/jstests/noPassthrough/unsupported_change_stream_deployments.js b/jstests/noPassthrough/unsupported_change_stream_deployments.js
index 82a497bf19b..90b494654e8 100644
--- a/jstests/noPassthrough/unsupported_change_stream_deployments.js
+++ b/jstests/noPassthrough/unsupported_change_stream_deployments.js
@@ -22,18 +22,22 @@
const conn = MongoRunner.runMongod({enableMajorityReadConcern: ""});
assert.neq(null, conn, "mongod was unable to start up");
+ // $changeStream cannot run on a non-existent database.
+ assert.writeOK(conn.getDB("test").ensure_db_exists.insert({}));
assertChangeStreamNotSupportedOnConnection(conn);
assert.eq(0, MongoRunner.stopMongod(conn));
// Test master/slave deployments.
const masterSlaveFixture = new ReplTest("change_stream");
const master = masterSlaveFixture.start(true, {enableMajorityReadConcern: ""});
+ assert.writeOK(master.getDB("test").ensure_db_exists.insert({}));
assertChangeStreamNotSupportedOnConnection(master);
const slave = masterSlaveFixture.start(false);
// Slaves start in FCV 3.4; we need to wait for it to sync the FCV document from the master
// before trying a change stream, or the change stream will fail for the wrong reason.
assert.soonNoExcept(() => checkFCV(slave.getDB("admin"), "3.6") || true);
+ assert.soonNoExcept(() => slave.getDB("test").ensure_db_exists.exists());
assertChangeStreamNotSupportedOnConnection(slave);
// Test a sharded cluster with standalone shards.
@@ -41,8 +45,14 @@
{shards: 2, other: {shardOptions: {enableMajorityReadConcern: ""}}, config: 1});
// Make sure the database exists before running any commands.
const mongosDB = clusterWithStandalones.getDB("test");
- assert.writeOK(mongosDB.unrelated.insert({}));
+ // enableSharding will create the db at the cluster level but not on the shards. $changeStream
+ // through mongoS will be allowed to run on the shards despite the lack of a database.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: "test"}));
assertChangeStreamNotSupportedOnConnection(clusterWithStandalones.s);
+ // Shard the 'ensure_db_exists' collection on a hashed key before running $changeStream on the
+ // shards directly. This will ensure that the database is created on both shards.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: "test.ensure_db_exists", key: {_id: "hashed"}}));
assertChangeStreamNotSupportedOnConnection(clusterWithStandalones.shard0);
assertChangeStreamNotSupportedOnConnection(clusterWithStandalones.shard1);
}());
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 7c430629d9b..4406c84240e 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -362,9 +362,16 @@ Status runAggregate(OperationContext* opCtx,
// of the collection on which $changeStream was invoked, so that we do not end up
// resolving the collation on the oplog.
invariant(!collatorToUse);
- // Change streams can only be created on collections. An error will be raised in
- // AutoGetCollection if the given namespace is a view.
+ // Change streams can only be run against collections; AutoGetCollection will raise an
+ // error if the given namespace is a view. A change stream may be opened on a namespace
+ // before the associated collection is created, but only if the database already exists.
+ // If the $changeStream was sent from mongoS then the database exists at the cluster
+ // level even if not yet present on this shard, so we allow the $changeStream to run.
AutoGetCollection origNssCtx(opCtx, origNss, MODE_IS);
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "cannot open $changeStream for non-existent database: "
+ << origNss.db(),
+ origNssCtx.getDb() || request.isFromMongos());
Collection* origColl = origNssCtx.getCollection();
collatorToUse.emplace(resolveCollator(opCtx, request, origColl));
}
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 8a32c3967a8..2654463431a 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -661,7 +661,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto executionNsRoutingInfoStatus =
getExecutionNsRoutingInfo(opCtx, namespaces.executionNss, catalogCache);
+ LiteParsedPipeline liteParsedPipeline(request);
+
if (!executionNsRoutingInfoStatus.isOK()) {
+ // Standard aggregations swallow 'NamespaceNotFound' and return an empty cursor with id 0 in
+ // the event that the database does not exist. For $changeStream aggregations, however, we
+ // throw the exception in all error cases, including that of a non-existent database.
+ uassert(executionNsRoutingInfoStatus.getStatus().code(),
+ str::stream() << "failed to open $changeStream: "
+ << executionNsRoutingInfoStatus.getStatus().reason(),
+ !liteParsedPipeline.hasChangeStream());
appendEmptyResultSet(
*result, executionNsRoutingInfoStatus.getStatus(), namespaces.requestedNss.ns());
return Status::OK();
@@ -678,7 +687,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// need to check if any involved collections are sharded before forwarding an aggregation
// command on an unsharded collection.
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
- LiteParsedPipeline liteParsedPipeline(request);
for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) {
const auto resolvedNsRoutingInfo =