summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/commands')
-rw-r--r--src/mongo/db/commands/SConscript4
-rw-r--r--src/mongo/db/commands/count_cmd.cpp10
-rw-r--r--src/mongo/db/commands/dbcheck.cpp195
-rw-r--r--src/mongo/db/commands/dbcommands.cpp8
-rw-r--r--src/mongo/db/commands/dbcommands_d.cpp4
-rw-r--r--src/mongo/db/commands/distinct.cpp11
-rw-r--r--src/mongo/db/commands/external_data_source_commands_test.cpp953
-rw-r--r--src/mongo/db/commands/external_data_source_scope_guard.cpp87
-rw-r--r--src/mongo/db/commands/external_data_source_scope_guard.h84
-rw-r--r--src/mongo/db/commands/fail_point_cmd.cpp4
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp13
-rw-r--r--src/mongo/db/commands/find_cmd.cpp32
-rw-r--r--src/mongo/db/commands/generic.cpp4
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp13
-rw-r--r--src/mongo/db/commands/parameters.cpp5
-rw-r--r--src/mongo/db/commands/pipeline_command.cpp24
-rw-r--r--src/mongo/db/commands/profile_common.cpp1
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp22
-rw-r--r--src/mongo/db/commands/run_aggregate.h4
-rw-r--r--src/mongo/db/commands/server_status_command.cpp4
-rw-r--r--src/mongo/db/commands/tenant_migration_recipient_cmds.idl2
21 files changed, 1342 insertions, 142 deletions
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 0074dc859c6..19de67d69b6 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -367,6 +367,7 @@ env.Library(
'$BUILD_DIR/mongo/db/repl/replica_set_messages',
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/rw_concern_d',
+ '$BUILD_DIR/mongo/db/s/query_analysis_writer',
'$BUILD_DIR/mongo/db/server_base',
'$BUILD_DIR/mongo/db/server_feature_flags',
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
@@ -789,6 +790,7 @@ env.CppUnitTest(
target="db_commands_test",
source=[
"create_indexes_test.cpp",
+ "external_data_source_commands_test.cpp",
"index_filter_commands_test.cpp",
"fle_compact_test.cpp",
"list_collections_filter_test.cpp",
@@ -819,7 +821,9 @@ env.CppUnitTest(
"$BUILD_DIR/mongo/db/repl/replmocks",
"$BUILD_DIR/mongo/db/repl/storage_interface_impl",
"$BUILD_DIR/mongo/db/service_context_d_test_fixture",
+ "$BUILD_DIR/mongo/db/storage/record_store_base",
'$BUILD_DIR/mongo/idl/idl_parser',
+ '$BUILD_DIR/mongo/util/version_impl',
"cluster_server_parameter_commands_invocation",
"core",
"create_command",
diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp
index 21ec11df16f..9a961c1ed0b 100644
--- a/src/mongo/db/commands/count_cmd.cpp
+++ b/src/mongo/db/commands/count_cmd.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/view_response_formatter.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/logv2/log.h"
#include "mongo/util/database_name_util.h"
@@ -249,6 +250,15 @@ public:
invocation->markMirrored();
}
+ if (analyze_shard_key::supportsPersistingSampledQueries() && request.getSampleId()) {
+ analyze_shard_key::QueryAnalysisWriter::get(opCtx)
+ .addCountQuery(*request.getSampleId(),
+ nss,
+ request.getQuery(),
+ request.getCollation().value_or(BSONObj()))
+ .getAsync([](auto) {});
+ }
+
if (ctx->getView()) {
auto viewAggregation = countCommandAsAggregationCommand(request, nss);
diff --git a/src/mongo/db/commands/dbcheck.cpp b/src/mongo/db/commands/dbcheck.cpp
index 1b1110abb29..f8aafcae484 100644
--- a/src/mongo/db/commands/dbcheck.cpp
+++ b/src/mongo/db/commands/dbcheck.cpp
@@ -65,9 +65,8 @@ repl::OpTime _logOp(OperationContext* opCtx,
repl::MutableOplogEntry oplogEntry;
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
oplogEntry.setNss(nss);
- if (uuid) {
- oplogEntry.setUuid(*uuid);
- }
+ oplogEntry.setTid(nss.tenantId());
+ oplogEntry.setUuid(uuid);
oplogEntry.setObject(obj);
AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
return writeConflictRetry(
@@ -144,7 +143,6 @@ struct DbCheckCollectionInfo {
int64_t maxDocsPerBatch;
int64_t maxBytesPerBatch;
int64_t maxBatchTimeMillis;
- bool snapshotRead;
WriteConcernOptions writeConcern;
};
@@ -167,6 +165,8 @@ std::unique_ptr<DbCheckRun> singleCollectionRun(OperationContext* opCtx,
"Cannot run dbCheck on " + nss.toString() + " because it is not replicated",
nss.isReplicated());
+ uassert(6769500, "dbCheck no longer supports snapshotRead:false", invocation.getSnapshotRead());
+
const auto start = invocation.getMinKey();
const auto end = invocation.getMaxKey();
const auto maxCount = invocation.getMaxCount();
@@ -184,7 +184,6 @@ std::unique_ptr<DbCheckRun> singleCollectionRun(OperationContext* opCtx,
maxDocsPerBatch,
maxBytesPerBatch,
maxBatchTimeMillis,
- invocation.getSnapshotRead(),
invocation.getBatchWriteConcern()};
auto result = std::make_unique<DbCheckRun>();
result->push_back(info);
@@ -201,6 +200,8 @@ std::unique_ptr<DbCheckRun> fullDatabaseRun(OperationContext* opCtx,
AutoGetDb agd(opCtx, dbName, MODE_IS);
uassert(ErrorCodes::NamespaceNotFound, "Database " + dbName.db() + " not found", agd.getDb());
+ uassert(6769501, "dbCheck no longer supports snapshotRead:false", invocation.getSnapshotRead());
+
const int64_t max = std::numeric_limits<int64_t>::max();
const auto rate = invocation.getMaxCountPerSecond();
const auto maxDocsPerBatch = invocation.getMaxDocsPerBatch();
@@ -220,7 +221,6 @@ std::unique_ptr<DbCheckRun> fullDatabaseRun(OperationContext* opCtx,
maxDocsPerBatch,
maxBytesPerBatch,
maxBatchTimeMillis,
- invocation.getSnapshotRead(),
invocation.getBatchWriteConcern()};
result->push_back(info);
return true;
@@ -241,7 +241,8 @@ std::unique_ptr<DbCheckRun> getRun(OperationContext* opCtx,
// Get rid of generic command fields.
for (const auto& elem : obj) {
- if (!isGenericArgument(elem.fieldNameStringData())) {
+ const auto& fieldName = elem.fieldNameStringData();
+ if (!isGenericArgument(fieldName)) {
builder.append(elem);
}
}
@@ -251,11 +252,17 @@ std::unique_ptr<DbCheckRun> getRun(OperationContext* opCtx,
// If the dbCheck argument is a string, this is the per-collection form.
if (toParse["dbCheck"].type() == BSONType::String) {
return singleCollectionRun(
- opCtx, dbName, DbCheckSingleInvocation::parse(IDLParserContext(""), toParse));
+ opCtx,
+ dbName,
+ DbCheckSingleInvocation::parse(
+ IDLParserContext("", false /*apiStrict*/, dbName.tenantId()), toParse));
} else {
// Otherwise, it's the database-wide form.
return fullDatabaseRun(
- opCtx, dbName, DbCheckAllInvocation::parse(IDLParserContext(""), toParse));
+ opCtx,
+ dbName,
+ DbCheckAllInvocation::parse(
+ IDLParserContext("", false /*apiStrict*/, dbName.tenantId()), toParse));
}
}
@@ -486,122 +493,77 @@ private:
const BSONKey& first,
int64_t batchDocs,
int64_t batchBytes) {
- auto lockMode = MODE_S;
- if (info.snapshotRead) {
- // Each batch will read at the latest no-overlap point, which is the all_durable
- // timestamp on primaries. We assume that the history window on secondaries is always
- // longer than the time it takes between starting and replicating a batch on the
- // primary. Otherwise, the readTimestamp will not be available on a secondary by the
- // time it processes the oplog entry.
- lockMode = MODE_IS;
- opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoOverlap);
+ // Each batch will read at the latest no-overlap point, which is the all_durable timestamp
+ // on primaries. We assume that the history window on secondaries is always longer than the
+ // time it takes between starting and replicating a batch on the primary. Otherwise, the
+ // readTimestamp will not be available on a secondary by the time it processes the oplog
+ // entry.
+ opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoOverlap);
+
+ // dbCheck writes to the oplog, so we need to take an IX lock. We don't need to write to the
+ // collection, however, so we only take an intent lock on it.
+ Lock::GlobalLock glob(opCtx, MODE_IX);
+ AutoGetCollection collection(opCtx, info.nss, MODE_IS);
+
+ if (_stepdownHasOccurred(opCtx, info.nss)) {
+ _done = true;
+ return Status(ErrorCodes::PrimarySteppedDown, "dbCheck terminated due to stepdown");
}
- BatchStats result;
- auto timeoutMs = Milliseconds(gDbCheckCollectionTryLockTimeoutMillis.load());
- const auto initialBackoffMs =
- Milliseconds(gDbCheckCollectionTryLockMinBackoffMillis.load());
- auto backoffMs = initialBackoffMs;
- for (int attempt = 1;; attempt++) {
- try {
- // Try to acquire collection lock with increasing timeout and bounded exponential
- // backoff.
- auto const lockDeadline = Date_t::now() + timeoutMs;
- timeoutMs *= 2;
-
- AutoGetCollection agc(
- opCtx, info.nss, lockMode, AutoGetCollection::Options{}.deadline(lockDeadline));
-
- if (_stepdownHasOccurred(opCtx, info.nss)) {
- _done = true;
- return Status(ErrorCodes::PrimarySteppedDown,
- "dbCheck terminated due to stepdown");
- }
+ if (!collection) {
+ const auto msg = "Collection under dbCheck no longer exists";
+ return {ErrorCodes::NamespaceNotFound, msg};
+ }
- const auto& collection =
- CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, info.nss);
- if (!collection) {
- const auto msg = "Collection under dbCheck no longer exists";
- return {ErrorCodes::NamespaceNotFound, msg};
- }
+ auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);
+ uassert(ErrorCodes::SnapshotUnavailable,
+ "No snapshot available yet for dbCheck",
+ readTimestamp);
+ auto minVisible = collection->getMinimumVisibleSnapshot();
+ if (minVisible && *readTimestamp < *collection->getMinimumVisibleSnapshot()) {
+ return {ErrorCodes::SnapshotUnavailable,
+ str::stream() << "Unable to read from collection " << info.nss
+ << " due to pending catalog changes"};
+ }
- auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);
- auto minVisible = collection->getMinimumVisibleSnapshot();
- if (readTimestamp && minVisible &&
- *readTimestamp < *collection->getMinimumVisibleSnapshot()) {
- return {ErrorCodes::SnapshotUnavailable,
- str::stream() << "Unable to read from collection " << info.nss
- << " due to pending catalog changes"};
- }
+ boost::optional<DbCheckHasher> hasher;
+ try {
+ hasher.emplace(opCtx,
+ *collection,
+ first,
+ info.end,
+ std::min(batchDocs, info.maxCount),
+ std::min(batchBytes, info.maxSize));
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
- boost::optional<DbCheckHasher> hasher;
- try {
- hasher.emplace(opCtx,
- collection,
- first,
- info.end,
- std::min(batchDocs, info.maxCount),
- std::min(batchBytes, info.maxSize));
- } catch (const DBException& e) {
- return e.toStatus();
- }
+ const auto batchDeadline = Date_t::now() + Milliseconds(info.maxBatchTimeMillis);
+ Status status = hasher->hashAll(opCtx, batchDeadline);
- const auto batchDeadline = Date_t::now() + Milliseconds(info.maxBatchTimeMillis);
- Status status = hasher->hashAll(opCtx, batchDeadline);
+ if (!status.isOK()) {
+ return status;
+ }
- if (!status.isOK()) {
- return status;
- }
+ std::string md5 = hasher->total();
- std::string md5 = hasher->total();
-
- DbCheckOplogBatch batch;
- batch.setType(OplogEntriesEnum::Batch);
- batch.setNss(info.nss);
- batch.setMd5(md5);
- batch.setMinKey(first);
- batch.setMaxKey(BSONKey(hasher->lastKey()));
- batch.setReadTimestamp(readTimestamp);
-
- // Send information on this batch over the oplog.
- result.time = _logOp(opCtx, info.nss, collection->uuid(), batch.toBSON());
- result.readTimestamp = readTimestamp;
-
- result.nDocs = hasher->docsSeen();
- result.nBytes = hasher->bytesSeen();
- result.lastKey = hasher->lastKey();
- result.md5 = md5;
-
- break;
- } catch (const ExceptionFor<ErrorCodes::LockTimeout>& e) {
- if (attempt > gDbCheckCollectionTryLockMaxAttempts.load()) {
- return StatusWith<BatchStats>(e.code(),
- "Unable to acquire the collection lock");
- }
+ DbCheckOplogBatch batch;
+ batch.setType(OplogEntriesEnum::Batch);
+ batch.setNss(info.nss);
+ batch.setMd5(md5);
+ batch.setMinKey(first);
+ batch.setMaxKey(BSONKey(hasher->lastKey()));
+ batch.setReadTimestamp(readTimestamp);
- // Bounded exponential backoff between tryLocks.
- opCtx->sleepFor(backoffMs);
- const auto maxBackoffMillis =
- Milliseconds(gDbCheckCollectionTryLockMaxBackoffMillis.load());
- if (backoffMs < maxBackoffMillis) {
- auto backoff = durationCount<Milliseconds>(backoffMs);
- auto initialBackoff = durationCount<Milliseconds>(initialBackoffMs);
- backoff *= initialBackoff;
- backoffMs = Milliseconds(backoff);
- }
- if (backoffMs > maxBackoffMillis) {
- backoffMs = maxBackoffMillis;
- }
- LOGV2_DEBUG(6175700,
- 1,
- "Could not acquire collection lock, retrying",
- "ns"_attr = info.nss.ns(),
- "batchRangeMin"_attr = info.start.obj(),
- "batchRangeMax"_attr = info.end.obj(),
- "attempt"_attr = attempt,
- "backoff"_attr = backoffMs);
- }
- }
+ // Send information on this batch over the oplog.
+ BatchStats result;
+ result.time = _logOp(opCtx, info.nss, collection->uuid(), batch.toBSON());
+ result.readTimestamp = readTimestamp;
+
+ result.nDocs = hasher->docsSeen();
+ result.nBytes = hasher->bytesSeen();
+ result.lastKey = hasher->lastKey();
+ result.md5 = md5;
return result;
}
@@ -663,7 +625,6 @@ public:
" maxDocsPerBatch: <max number of docs/batch>\n"
" maxBytesPerBatch: <try to keep a batch within max bytes/batch>\n"
" maxBatchTimeMillis: <max time processing a batch in milliseconds>\n"
- " readTimestamp: <bool, read at a timestamp without strong locks> }\n"
"to check a collection.\n"
"Invoke with {dbCheck: 1} to check all collections in the database.";
}
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp
index 6374a093936..0b6f90d0203 100644
--- a/src/mongo/db/commands/dbcommands.cpp
+++ b/src/mongo/db/commands/dbcommands.cpp
@@ -485,6 +485,10 @@ public:
return Request::kCommandDescription.toString();
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
// Assume that appendCollectionStorageStats() gives us a valid response.
void validateResult(const BSONObj& resultObj) final {}
@@ -587,6 +591,10 @@ public:
CmdDbStats() : TypedCommand(Request::kCommandName, Request::kCommandAlias) {}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
diff --git a/src/mongo/db/commands/dbcommands_d.cpp b/src/mongo/db/commands/dbcommands_d.cpp
index 5f42cb45010..5ad0adf46c5 100644
--- a/src/mongo/db/commands/dbcommands_d.cpp
+++ b/src/mongo/db/commands/dbcommands_d.cpp
@@ -248,6 +248,10 @@ public:
return Status::OK();
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
bool run(OperationContext* opCtx,
const DatabaseName& dbName,
const BSONObj& jsobj,
diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp
index 070da8d285c..eb599424001 100644
--- a/src/mongo/db/commands/distinct.cpp
+++ b/src/mongo/db/commands/distinct.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/query/query_planner_common.h"
#include "mongo/db/query/view_response_formatter.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/logv2/log.h"
#include "mongo/util/database_name_util.h"
@@ -244,6 +245,16 @@ public:
invocation->markMirrored();
}
+ if (analyze_shard_key::supportsPersistingSampledQueries() && parsedDistinct.getSampleId()) {
+ auto cq = parsedDistinct.getQuery();
+ analyze_shard_key::QueryAnalysisWriter::get(opCtx)
+ .addDistinctQuery(*parsedDistinct.getSampleId(),
+ nss,
+ cq->getQueryObj(),
+ cq->getFindCommandRequest().getCollation())
+ .getAsync([](auto) {});
+ }
+
if (ctx->getView()) {
// Relinquish locks. The aggregation command will re-acquire them.
ctx.reset();
diff --git a/src/mongo/db/commands/external_data_source_commands_test.cpp b/src/mongo/db/commands/external_data_source_commands_test.cpp
new file mode 100644
index 00000000000..113495efdc8
--- /dev/null
+++ b/src/mongo/db/commands/external_data_source_commands_test.cpp
@@ -0,0 +1,953 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <fmt/format.h>
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/dbclient_cursor.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
+#include "mongo/db/query/query_knobs_gen.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/storage/named_pipe.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+namespace {
+using namespace fmt::literals;
+
+class PipeWaiter {
+public:
+ void notify() {
+ {
+ stdx::unique_lock lk(m);
+ pipeCreated = true;
+ }
+ cv.notify_one();
+ }
+
+ void wait() {
+ stdx::unique_lock lk(m);
+ cv.wait(lk, [&] { return pipeCreated; });
+ }
+
+private:
+ Mutex m;
+ stdx::condition_variable cv;
+ bool pipeCreated = false;
+};
+
+class ExternalDataSourceCommandsTest : public ServiceContextMongoDTest {
+protected:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+
+ std::srand(std::time(0));
+
+ const auto service = getServiceContext();
+ auto replCoord =
+ std::make_unique<repl::ReplicationCoordinatorMock>(service, repl::ReplSettings{});
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ repl::ReplicationCoordinator::set(service, std::move(replCoord));
+ repl::createOplog(_opCtx);
+
+ computeModeEnabled = true;
+ }
+
+ void tearDown() override {
+ computeModeEnabled = false;
+ ServiceContextMongoDTest::tearDown();
+ }
+
+ std::vector<BSONObj> generateRandomSimpleDocs(int count) {
+ std::vector<BSONObj> docs;
+ for (int i = 0; i < count; ++i) {
+ docs.emplace_back(BSON("a" << std::rand() % 10));
+ }
+
+ return docs;
+ }
+
+ // Generates a large readable random string to aid debugging.
+ std::string getRandomReadableLargeString() {
+ int count = std::rand() % 100 + 2024;
+ std::string str(count, '\0');
+ for (int i = 0; i < count; ++i) {
+ str[i] = static_cast<char>(std::rand() % 26) + 'a';
+ }
+
+ return str;
+ }
+
+ std::vector<BSONObj> generateRandomLargeDocs(int count) {
+ std::vector<BSONObj> docs;
+ for (int i = 0; i < count; ++i) {
+ docs.emplace_back(BSON("a" << getRandomReadableLargeString()));
+ }
+
+ return docs;
+ }
+
+ // This verifies that a simple explain aggregate command works. Virtual collections are created
+ // even for explain aggregate command.
+ void verifyExplainAggCommand(DBDirectClient& client, const BSONObj& explainAggCmdObj) {
+ // The first request.
+ BSONObj res;
+ ASSERT_TRUE(client.runCommand(kDatabaseName, explainAggCmdObj.getOwned(), res))
+ << "Expected to succeed but failed. result = {}"_format(res.toString());
+ // Sanity checks of result.
+ ASSERT_EQ(res["ok"].Number(), 1.0)
+ << "Expected to succeed but failed. result = {}"_format(res.toString());
+ }
+
+ ServiceContext::UniqueOperationContext _uniqueOpCtx{makeOperationContext()};
+ OperationContext* _opCtx{_uniqueOpCtx.get()};
+
+ BSONObj explainSingleNamedPipeAggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [],
+ explain: true,
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+ BSONObj explainMultipleNamedPipesAggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [],
+ explain: true,
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [
+ {url: "file://named_pipe1", storageType: "pipe", fileType: "bson"},
+ {url: "file://named_pipe2", storageType: "pipe", fileType: "bson"}
+ ]
+ }]
+}
+ )");
+
+ static constexpr auto kDatabaseName = "external_data_source";
+};
+
+TEST_F(ExternalDataSourceCommandsTest, SimpleScanAggRequest) {
+ const auto nDocs = std::rand() % 100 + 1;
+ std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs);
+ PipeWaiter pw;
+
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter("named_pipe1");
+ pw.notify();
+ pipeWriter.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+
+ // The first request.
+ BSONObj res;
+ ASSERT_TRUE(client.runCommand(kDatabaseName, aggCmdObj.getOwned(), res));
+
+ // Sanity checks of result.
+ ASSERT_EQ(res["ok"].Number(), 1.0);
+ ASSERT_TRUE(res.hasField("cursor") && res["cursor"].Obj().hasField("firstBatch"));
+
+ // The default batch size is 101 and so all data must be contained in the first batch. cursor.id
+ // == 0 means that no cursor is necessary.
+ ASSERT_TRUE(res["cursor"].Obj().hasField("id") && res["cursor"]["id"].Long() == 0);
+ auto resDocs = res["cursor"]["firstBatch"].Array();
+ ASSERT_EQ(resDocs.size(), nDocs);
+ for (int i = 0; i < nDocs; ++i) {
+ ASSERT_BSONOBJ_EQ(resDocs[i].Obj(), srcDocs[i]);
+ }
+
+ // The second request. This verifies that virtual collections are cleaned up after the
+ // aggregation request is done.
+ verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj);
+}
+
+TEST_F(ExternalDataSourceCommandsTest, SimpleScanOverMultipleNamedPipesAggRequest) {
+ // This data set fits into the first batch.
+ const auto nDocs = std::rand() % 50;
+ std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs);
+ PipeWaiter pw;
+
+ // Pushes data into multiple named pipes. We can't push data into multiple named pipes
+ // simultaneously because writers will be blocked until the reader consumes data. So, we push
+ // data into one named pipe after another.
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter1("named_pipe1");
+ NamedPipeOutput pipeWriter2("named_pipe2");
+ pw.notify();
+ pipeWriter1.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter1.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter1.close();
+
+ pipeWriter2.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter2.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter2.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [
+ {url: "file://named_pipe1", storageType: "pipe", fileType: "bson"},
+ {url: "file://named_pipe2", storageType: "pipe", fileType: "bson"}
+ ]
+ }]
+}
+ )");
+
+ // The first request.
+ BSONObj res;
+ ASSERT_TRUE(client.runCommand(kDatabaseName, aggCmdObj.getOwned(), res));
+
+ // Sanity checks of result.
+ ASSERT_EQ(res["ok"].Number(), 1.0);
+ ASSERT_TRUE(res.hasField("cursor") && res["cursor"].Obj().hasField("firstBatch"));
+
+ // The default batch size is 101 and so all data must be contained in the first batch. cursor.id
+ // == 0 means that no cursor is necessary.
+ ASSERT_TRUE(res["cursor"].Obj().hasField("id") && res["cursor"]["id"].Long() == 0);
+ auto resDocs = res["cursor"]["firstBatch"].Array();
+ ASSERT_EQ(resDocs.size(), nDocs * 2);
+ for (int i = 0; i < nDocs; ++i) {
+ ASSERT_BSONOBJ_EQ(resDocs[i].Obj(), srcDocs[i % nDocs]);
+ }
+
+ // The second request. This verifies that virtual collections are cleaned up after the
+ // aggregation request is done.
+ verifyExplainAggCommand(client, explainMultipleNamedPipesAggCmdObj);
+}
+
+TEST_F(ExternalDataSourceCommandsTest, SimpleScanOverLargeObjectsAggRequest) {
+ // MultiBsonStreamCursor's default buffer size is 8K and 2K (at minimum) * 20 would be enough to
+ // exceed the initial read. This data set is highly likely to span multiple reads.
+ const auto nDocs = std::rand() % 80 + 20;
+ std::vector<BSONObj> srcDocs = generateRandomLargeDocs(nDocs);
+ PipeWaiter pw;
+
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter("named_pipe1");
+ pw.notify();
+ pipeWriter.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+
+ // The first request.
+ BSONObj res;
+ ASSERT_TRUE(client.runCommand(kDatabaseName, aggCmdObj.getOwned(), res));
+
+ // Sanity checks of result.
+ ASSERT_EQ(res["ok"].Number(), 1.0);
+ ASSERT_TRUE(res.hasField("cursor") && res["cursor"].Obj().hasField("firstBatch"));
+
+ // The default batch size is 101 and so all data must be contained in the first batch. cursor.id
+ // == 0 means that no cursor is necessary.
+ ASSERT_TRUE(res["cursor"].Obj().hasField("id") && res["cursor"]["id"].Long() == 0);
+ auto resDocs = res["cursor"]["firstBatch"].Array();
+ ASSERT_EQ(resDocs.size(), nDocs);
+ for (int i = 0; i < nDocs; ++i) {
+ ASSERT_BSONOBJ_EQ(resDocs[i].Obj(), srcDocs[i]);
+ }
+
+ // The second request. This verifies that virtual collections are cleaned up after the
+ // aggregation request is done.
+ verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj);
+}
+
+// Tests that 'explain' flag works and also tests that the same aggregation request works with the
+// same $_externalDataSources again to see whether there are no remaining virtual collections left
+// behind after the aggregation request is done.
+TEST_F(ExternalDataSourceCommandsTest, ExplainAggRequest) {
+ DBDirectClient client(_opCtx);
+ // The first request.
+ verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj);
+
+ // The second request.
+ verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj);
+}
+
+TEST_F(ExternalDataSourceCommandsTest, SimpleScanMultiBatchAggRequest) {
+ // This 'nDocs' causes a cursor to be created for a simple scan aggregate command.
+ const auto nDocs = std::rand() % 100 + 102;
+ std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs);
+ PipeWaiter pw;
+
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter("named_pipe1");
+ pw.notify();
+ pipeWriter.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+
+ auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj);
+ ASSERT_OK(swAggReq.getStatus());
+ auto swCursor = DBClientCursor::fromAggregationRequest(
+ &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false);
+ ASSERT_OK(swCursor.getStatus());
+
+ auto cursor = std::move(swCursor.getValue());
+ int resCnt = 0;
+ // While iterating over the cursor, getMore() request(s) will be sent and the server-side cursor
+ // will be destroyed after all data is exhausted.
+ while (cursor->more()) {
+ auto doc = cursor->next();
+ ASSERT_BSONOBJ_EQ(doc, srcDocs[resCnt]);
+ ++resCnt;
+ }
+ ASSERT_EQ(resCnt, nDocs);
+
+ // The second explain request. This verifies that virtual collections are cleaned up after
+ // multi-batch result for an aggregation request.
+ verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj);
+}
+
+TEST_F(ExternalDataSourceCommandsTest, SimpleMatchAggRequest) {
+ const auto nDocs = std::rand() % 100 + 1;
+ std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs);
+ // Expected results for {$match: {a: {$lt: 5}}}.
+ std::vector<BSONObj> expectedDocs;
+ std::for_each(srcDocs.begin(), srcDocs.end(), [&](const BSONObj& doc) {
+ if (doc["a"].Int() < 5) {
+ expectedDocs.emplace_back(doc);
+ }
+ });
+ PipeWaiter pw;
+
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter("named_pipe1");
+ pw.notify();
+ pipeWriter.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [{$match: {a: {$lt: 5}}}],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+
+ auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj);
+ ASSERT_OK(swAggReq.getStatus());
+ auto swCursor = DBClientCursor::fromAggregationRequest(
+ &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false);
+ ASSERT_OK(swCursor.getStatus());
+
+ auto cursor = std::move(swCursor.getValue());
+ int resCnt = 0;
+ while (cursor->more()) {
+ auto doc = cursor->next();
+ ASSERT_BSONOBJ_EQ(doc, expectedDocs[resCnt]);
+ ++resCnt;
+ }
+ ASSERT_EQ(resCnt, expectedDocs.size());
+
+ // The second explain request. This verifies that virtual collections are cleaned up after
+ // the aggregation request is done.
+ verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj);
+}
+
+TEST_F(ExternalDataSourceCommandsTest, ScanOverRandomInvalidDataAggRequest) {
+ const auto nDocs = std::rand() % 100 + 1;
+ std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs);
+ PipeWaiter pw;
+
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter("named_pipe1");
+ pw.notify();
+ const size_t failPoint = std::rand() % nDocs;
+ pipeWriter.open();
+ for (size_t i = 0; i < srcDocs.size(); ++i) {
+ if (i == failPoint) {
+ // Intentionally pushes invalid data at the fail point so that an error happens at
+ // the reader-side
+ pipeWriter.write(srcDocs[i].objdata(), srcDocs[i].objsize() / 2);
+ } else {
+ pipeWriter.write(srcDocs[i].objdata(), srcDocs[i].objsize());
+ }
+ }
+ pipeWriter.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [{$match: {a: {$lt: 5}}}],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+
+ BSONObj res;
+ ASSERT_FALSE(client.runCommand(kDatabaseName, aggCmdObj.getOwned(), res));
+ ASSERT_EQ(res["ok"].Number(), 0.0);
+ // The fail point is randomly chosen and different error codes are expected, depending on the
+ // chosen fail point.
+ ASSERT_NE(ErrorCodes::Error(res["code"].Int()), ErrorCodes::OK);
+
+ // The second explain request. This verifies that virtual collections are cleaned up after
+ // the aggregation request fails.
+ verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj);
+}
+
+TEST_F(ExternalDataSourceCommandsTest, ScanOverRandomInvalidDataAtSecondBatchAggRequest) {
+ // This 'nDocs' causes a cursor to be created for a simple scan aggregate command.
+ const auto nDocs = std::rand() % 100 + 102; // 201 >= nDocs >= 102
+ std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs);
+ PipeWaiter pw;
+
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter("named_pipe1");
+ pw.notify();
+ // The fail point occurs at the second batch.
+ const size_t failPoint = 101 + std::rand() % (nDocs - 101); // 200 >= failPoint >= 101
+ pipeWriter.open();
+ for (size_t i = 0; i < srcDocs.size(); ++i) {
+ if (i == failPoint) {
+ // Intentionally pushes invalid data at the fail point so that an error happens at
+ // the reader-side
+ pipeWriter.write(srcDocs[i].objdata(), srcDocs[i].objsize() / 2);
+ } else {
+ pipeWriter.write(srcDocs[i].objdata(), srcDocs[i].objsize());
+ }
+ }
+ pipeWriter.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+
+ auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj);
+ ASSERT_OK(swAggReq.getStatus());
+ auto swCursor = DBClientCursor::fromAggregationRequest(
+ &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false);
+ ASSERT_OK(swCursor.getStatus());
+
+ auto cursor = std::move(swCursor.getValue());
+ int resCnt = 0;
+ bool errorOccurred = false;
+ try {
+ while (cursor->more()) {
+ auto doc = cursor->next();
+ ASSERT_BSONOBJ_EQ(doc, srcDocs[resCnt]);
+ ++resCnt;
+ }
+ } catch (const DBException& ex) {
+ errorOccurred = true;
+ ASSERT_NE(ex.code(), ErrorCodes::OK);
+ }
+ ASSERT_TRUE(errorOccurred);
+
+ // The second explain request. This verifies that virtual collections are cleaned up after
+ // the getMore request for the aggregation results fails.
+ verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj);
+}
+
+TEST_F(ExternalDataSourceCommandsTest, KillCursorAfterAggRequest) {
+ // This 'nDocs' causes a cursor to be created for a simple scan aggregate command.
+ const auto nDocs = std::rand() % 100 + 102;
+ std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs);
+ PipeWaiter pw;
+
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter("named_pipe1");
+ pw.notify();
+ pipeWriter.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+
+ // The first request.
+ BSONObj res;
+ ASSERT_TRUE(client.runCommand(kDatabaseName, aggCmdObj.getOwned(), res));
+
+ // Sanity checks of result.
+ ASSERT_EQ(res["ok"].Number(), 1.0);
+ ASSERT_TRUE(res.hasField("cursor") && res["cursor"].Obj().hasField("firstBatch"));
+
+ // The default batch size is 101 and results can be returned through multiple batches. cursor.id
+ // != 0 means that a cursor is created.
+ auto cursorId = res["cursor"]["id"].Long();
+ ASSERT_TRUE(res["cursor"].Obj().hasField("id") && cursorId != 0);
+
+ // Kills the cursor.
+ auto killCursorCmdObj = BSON("killCursors"
+ << "coll"
+ << "cursors" << BSON_ARRAY(cursorId));
+ ASSERT_TRUE(client.runCommand(kDatabaseName, killCursorCmdObj.getOwned(), res));
+ ASSERT_EQ(res["ok"].Number(), 1.0);
+ auto cursorsKilled = res["cursorsKilled"].Array();
+ ASSERT_TRUE(cursorsKilled.size() == 1 && cursorsKilled[0].Long() == cursorId);
+
+ // The second explain request. This verifies that virtual collections are cleaned up after
+ // the cursor for the aggregate request is killed.
+ verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj);
+}
+
+TEST_F(ExternalDataSourceCommandsTest, SimpleScanAndUnionWithMultipleSourcesAggRequest) {
+ const auto nDocs = std::rand() % 100 + 1;
+ std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs);
+ PipeWaiter pw;
+
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter1("named_pipe1");
+ pw.notify();
+ pipeWriter1.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter1.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter1.close();
+
+ NamedPipeOutput pipeWriter2("named_pipe2");
+ pipeWriter2.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter2.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter2.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ // An aggregate request with a simple scan and $unionWith stage. $_externalDataSources option
+ // defines multiple data sources.
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll1",
+ pipeline: [{$unionWith: "coll2"}],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll1",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }, {
+ collName: "coll2",
+ dataSources: [{url: "file://named_pipe2", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+
+ auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj);
+ ASSERT_OK(swAggReq.getStatus());
+ auto swCursor = DBClientCursor::fromAggregationRequest(
+ &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false);
+ ASSERT_OK(swCursor.getStatus());
+
+ auto cursor = std::move(swCursor.getValue());
+ int resCnt = 0;
+ while (cursor->more()) {
+ auto doc = cursor->next();
+ // Simple scan from 'coll1' first and then $unionWith from 'coll2'.
+ ASSERT_BSONOBJ_EQ(doc, srcDocs[resCnt % nDocs]);
+ ++resCnt;
+ }
+ ASSERT_EQ(resCnt, nDocs * 2);
+
+ auto explainAggCmdObj = fromjson(R"(
+{
+ aggregate: "coll1",
+ pipeline: [{$unionWith: "coll2"}],
+ explain: true,
+ $_externalDataSources: [{
+ collName: "coll1",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }, {
+ collName: "coll2",
+ dataSources: [{url: "file://named_pipe2", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+
+ // The second explain request. This verifies that virtual collections are cleaned up after
+ // the aggregation request is done.
+ verifyExplainAggCommand(client, explainAggCmdObj);
+}
+
+TEST_F(ExternalDataSourceCommandsTest, GroupAggRequest) {
+ std::vector<BSONObj> srcDocs = {
+ fromjson(R"(
+ {
+ "_id" : 1,
+ "item" : "a",
+ "quantity" : 2
+ })"),
+ fromjson(R"(
+ {
+ "_id" : 2,
+ "item" : "b",
+ "quantity" : 1
+ })"),
+ fromjson(R"(
+ {
+ "_id" : 3,
+ "item" : "a",
+ "quantity" : 5
+ })"),
+ fromjson(R"(
+ {
+ "_id" : 4,
+ "item" : "b",
+ "quantity" : 10
+ })"),
+ fromjson(R"(
+ {
+ "_id" : 5,
+ "item" : "c",
+ "quantity" : 10
+ })"),
+ };
+ PipeWaiter pw;
+
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter("named_pipe1");
+ pw.notify();
+ pipeWriter.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll",
+ pipeline: [{$group: {_id: "$item", o: {$sum: "$quantity"}}}],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+ std::vector<BSONObj> expectedRes = {
+ fromjson(R"(
+ {
+ "_id" : "a",
+ "o" : 7
+ })"),
+ fromjson(R"(
+ {
+ "_id" : "b",
+ "o" : 11
+ })"),
+ fromjson(R"(
+ {
+ "_id" : "c",
+ "o" : 10
+ })"),
+ };
+
+ auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj);
+ ASSERT_OK(swAggReq.getStatus());
+ auto swCursor = DBClientCursor::fromAggregationRequest(
+ &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false);
+ ASSERT_OK(swCursor.getStatus());
+
+ auto cursor = std::move(swCursor.getValue());
+ int resCnt = 0;
+ while (cursor->more()) {
+ auto doc = cursor->next();
+ // Result set is pretty small and so we use linear search of vector.
+ ASSERT_TRUE(
+ std::find_if(expectedRes.begin(), expectedRes.end(), [&](const BSONObj& expectedObj) {
+ return expectedObj.objsize() == doc.objsize() &&
+ std::memcmp(expectedObj.objdata(), doc.objdata(), expectedObj.objsize()) == 0;
+ }) != expectedRes.end());
+ ++resCnt;
+ }
+ ASSERT_EQ(resCnt, expectedRes.size());
+
+ // The second explain request. This verifies that virtual collections are cleaned up after
+ // the aggregation request is done.
+ verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj);
+}
+
+TEST_F(ExternalDataSourceCommandsTest, LookupAggRequest) {
+ std::vector<BSONObj> srcDocs = {
+ fromjson(R"(
+ {
+ "a" : 1,
+ "data" : "abcd"
+ })"),
+ fromjson(R"(
+ {
+ "a" : 2,
+ "data" : "efgh"
+ })"),
+ fromjson(R"(
+ {
+ "a" : 3,
+ "data" : "ijkl"
+ })"),
+ };
+ PipeWaiter pw;
+
+ // For the $lookup stage, we need data to be available for both named pipes simultaneously
+ // because $lookup would read data from both collections and so we use two different named
+ // pipes and pushes data into the inner side first. To avoid racy condition, notify the reader
+ // side after both named pipes are created. This order is geared toward hash join algorithm.
+ stdx::thread producer([&] {
+ NamedPipeOutput pipeWriter2("named_pipe2");
+ NamedPipeOutput pipeWriter1("named_pipe1");
+ pw.notify();
+
+ // Pushes data into the inner side (== coll2 with named_pipe2) first because the hash join
+ // builds the inner (or build) side first.
+ pipeWriter2.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter2.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter2.close();
+
+ pipeWriter1.open();
+ for (auto&& srcDoc : srcDocs) {
+ pipeWriter1.write(srcDoc.objdata(), srcDoc.objsize());
+ }
+ pipeWriter1.close();
+ });
+ ON_BLOCK_EXIT([&] { producer.join(); });
+
+ // Gives some time to the producer so that it can initialize a named pipe.
+ pw.wait();
+
+ DBDirectClient client(_opCtx);
+ auto aggCmdObj = fromjson(R"(
+{
+ aggregate: "coll1",
+ pipeline: [{$lookup: {from: "coll2", localField: "a", foreignField: "a", as: "o"}}],
+ cursor: {},
+ $_externalDataSources: [{
+ collName: "coll1",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }, {
+ collName: "coll2",
+ dataSources: [{url: "file://named_pipe2", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+ std::vector<BSONObj> expectedRes = {
+ fromjson(R"(
+ {
+ "a" : 1,
+ "data" : "abcd",
+ "o" : [{"a": 1, "data": "abcd"}]
+ })"),
+ fromjson(R"(
+ {
+ "a" : 2,
+ "data" : "efgh",
+ "o" : [{"a": 2, "data": "efgh"}]
+ })"),
+ fromjson(R"(
+ {
+ "a" : 3,
+ "data" : "ijkl",
+ "o" : [{"a": 3, "data": "ijkl"}]
+ })"),
+ };
+
+ auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj);
+ ASSERT_OK(swAggReq.getStatus());
+ auto swCursor = DBClientCursor::fromAggregationRequest(
+ &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false);
+ ASSERT_OK(swCursor.getStatus());
+
+ auto cursor = std::move(swCursor.getValue());
+ int resCnt = 0;
+ while (cursor->more()) {
+ auto doc = cursor->next();
+ // Result set is pretty small and so we use linear search of vector.
+ ASSERT_TRUE(
+ std::find_if(expectedRes.begin(), expectedRes.end(), [&](const BSONObj& expectedObj) {
+ return expectedObj.objsize() == doc.objsize() &&
+ std::memcmp(expectedObj.objdata(), doc.objdata(), expectedObj.objsize()) == 0;
+ }) != expectedRes.end());
+ ++resCnt;
+ }
+ ASSERT_EQ(resCnt, expectedRes.size());
+
+ auto explainAggCmdObj = fromjson(R"(
+{
+ aggregate: "coll1",
+ pipeline: [{$lookup: {from: "coll2", localField: "a", foreignField: "a", as: "o"}}],
+ explain: true,
+ $_externalDataSources: [{
+ collName: "coll1",
+ dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}]
+ }, {
+ collName: "coll2",
+ dataSources: [{url: "file://named_pipe2", storageType: "pipe", fileType: "bson"}]
+ }]
+}
+ )");
+
+ // The second explain request. This verifies that virtual collections are cleaned up after
+ // the aggregation request is done.
+ verifyExplainAggCommand(client, explainAggCmdObj);
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/commands/external_data_source_scope_guard.cpp b/src/mongo/db/commands/external_data_source_scope_guard.cpp
new file mode 100644
index 00000000000..f9a29fcba67
--- /dev/null
+++ b/src/mongo/db/commands/external_data_source_scope_guard.cpp
@@ -0,0 +1,87 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/commands/external_data_source_scope_guard.h"
+
+#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog/drop_collection.h"
+#include "mongo/db/catalog/virtual_collection_options.h"
+#include "mongo/db/drop_gen.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/external_data_source_option_gen.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/destructor_guard.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+namespace mongo {
+ExternalDataSourceScopeGuard::ExternalDataSourceScopeGuard(
+ OperationContext* opCtx,
+ const std::vector<std::pair<NamespaceString, std::vector<ExternalDataSourceInfo>>>&
+ usedExternalDataSources)
+ : _opCtx(opCtx) {
+ // Just in case that any virtual collection could not be created, when dtor does not have a
+ // chance to be executed, cleans up collections that has already been created at that
+ // moment.
+ ScopeGuard dropVcollGuard([&] { dropVirtualCollections(); });
+
+ for (auto&& [extDataSourceNss, dataSources] : usedExternalDataSources) {
+ VirtualCollectionOptions vopts(dataSources);
+ uassertStatusOK(createVirtualCollection(opCtx, extDataSourceNss, vopts));
+ _toBeDroppedVirtualCollections.emplace_back(extDataSourceNss);
+ }
+
+ dropVcollGuard.dismiss();
+}
+
+void ExternalDataSourceScopeGuard::dropVirtualCollections() noexcept {
+ // The move constructor sets '_opCtx' to null when ownership is moved to the other object which
+ // means this object must not try to drop collections. There's nothing to drop if '_opCtx' is
+ // null.
+ if (!_opCtx) {
+ return;
+ }
+
+ // This function is called in a context of destructor or exception and so guard this against any
+ // exceptions.
+ DESTRUCTOR_GUARD({
+ for (auto&& nss : _toBeDroppedVirtualCollections) {
+ DropReply reply;
+ auto status =
+ dropCollection(_opCtx,
+ nss,
+ &reply,
+ DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops);
+ if (!status.isOK()) {
+ LOGV2_ERROR(6968700, "Failed to drop an external data source", "coll"_attr = nss);
+ }
+ }
+ });
+}
+} // namespace mongo
diff --git a/src/mongo/db/commands/external_data_source_scope_guard.h b/src/mongo/db/commands/external_data_source_scope_guard.h
new file mode 100644
index 00000000000..21b48890432
--- /dev/null
+++ b/src/mongo/db/commands/external_data_source_scope_guard.h
@@ -0,0 +1,84 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/clientcursor.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/external_data_source_option_gen.h"
+
+namespace mongo {
+/**
+ * This class makes sure that virtual collections that are created for external data sources are
+ * dropped when it's destroyed.
+ */
+class ExternalDataSourceScopeGuard {
+public:
+ // Makes ExternalDataSourceScopeGuard a decoration of ClientCursor.
+ static const ClientCursor::Decoration<std::shared_ptr<ExternalDataSourceScopeGuard>> get;
+
+ // Updates the operation context of decorated ExternalDataSourceScopeGuard object of 'cursor'
+ // so that it can drop virtual collections in the new 'opCtx'.
+ static void updateOperationContext(const ClientCursor* cursor, OperationContext* opCtx) {
+ if (auto self = get(cursor); self) {
+ get(cursor)->_opCtx = opCtx;
+ }
+ }
+
+ ExternalDataSourceScopeGuard() : _opCtx(nullptr), _toBeDroppedVirtualCollections() {}
+
+ ExternalDataSourceScopeGuard(
+ OperationContext* opCtx,
+ const std::vector<std::pair<NamespaceString, std::vector<ExternalDataSourceInfo>>>&
+ usedExternalDataSources);
+
+ // It does not make sense to support copy ctor because this object must drop created virtual
+ // collections.
+ ExternalDataSourceScopeGuard(const ExternalDataSourceScopeGuard&) = delete;
+
+ ExternalDataSourceScopeGuard(ExternalDataSourceScopeGuard&& other) noexcept
+ : _opCtx(other._opCtx),
+ _toBeDroppedVirtualCollections(std::move(other._toBeDroppedVirtualCollections)) {
+ // Ownership of created virtual collections are moved to this object and the other object
+ // must not try to drop them any more.
+ other._opCtx = nullptr;
+ }
+
+ ~ExternalDataSourceScopeGuard() {
+ dropVirtualCollections();
+ }
+
+private:
+ void dropVirtualCollections() noexcept;
+
+ OperationContext* _opCtx;
+ std::vector<NamespaceString> _toBeDroppedVirtualCollections;
+};
+} // namespace mongo
diff --git a/src/mongo/db/commands/fail_point_cmd.cpp b/src/mongo/db/commands/fail_point_cmd.cpp
index 7e868735f5b..a067522d6fe 100644
--- a/src/mongo/db/commands/fail_point_cmd.cpp
+++ b/src/mongo/db/commands/fail_point_cmd.cpp
@@ -93,6 +93,10 @@ public:
return Status::OK();
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
std::string help() const override {
return "modifies the settings of a fail point";
}
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 1ba9efa0a75..ec70924da40 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -60,6 +60,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
#include "mongo/db/stats/top.h"
@@ -76,6 +77,7 @@
namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(failAllFindAndModify);
MONGO_FAIL_POINT_DEFINE(hangBeforeFindAndModifyPerformsUpdate);
/**
@@ -682,6 +684,16 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun(
}
}
+ if (analyze_shard_key::supportsPersistingSampledQueries() && request().getSampleId()) {
+ analyze_shard_key::QueryAnalysisWriter::get(opCtx)
+ .addFindAndModifyQuery(request())
+ .getAsync([](auto) {});
+ }
+
+ if (MONGO_unlikely(failAllFindAndModify.shouldFail())) {
+ uasserted(ErrorCodes::InternalError, "failAllFindAndModify failpoint active!");
+ }
+
const bool inTransaction = opCtx->inMultiDocumentTransaction();
// Although usually the PlanExecutor handles WCE internally, it will throw WCEs when it
@@ -711,6 +723,7 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun(
if (opCtx->getTxnNumber()) {
updateRequest.setStmtIds({stmtId});
}
+ updateRequest.setSampleId(req.getSampleId());
const ExtensionsCallbackReal extensionsCallback(
opCtx, &updateRequest.getNamespaceString());
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index d048a2ec616..1188981c991 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
@@ -65,6 +66,7 @@
#include "mongo/db/transaction/transaction_participant.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/database_name_util.h"
#include "mongo/util/fail_point.h"
@@ -362,6 +364,8 @@ public:
// execution tree with an EOFStage.
const auto& collection = ctx->getCollection();
+ cq->setUseCqfIfEligible(true);
+
// Get the execution plan for the query.
bool permitYield = true;
auto exec =
@@ -433,10 +437,10 @@ public:
// The presence of a term in the request indicates that this is an internal replication
// oplog read request.
if (term && isOplogNss) {
- // We do not want to take tickets for internal (replication) oplog reads. Stalling
- // on ticket acquisition can cause complicated deadlocks. Primaries may depend on
- // data reaching secondaries in order to proceed; and secondaries may get stalled
- // replicating because of an inability to acquire a read ticket.
+ // We do not want to wait to take tickets for internal (replication) oplog reads.
+ // Stalling on ticket acquisition can cause complicated deadlocks. Primaries may
+ // depend on data reaching secondaries in order to proceed; and secondaries may get
+ // stalled replicating because of an inability to acquire a read ticket.
opCtx->lockState()->setAdmissionPriority(AdmissionContext::Priority::kImmediate);
}
@@ -481,6 +485,16 @@ public:
.expectedUUID(findCommand->getCollectionUUID()));
const auto& nss = ctx->getNss();
+ if (analyze_shard_key::supportsPersistingSampledQueries() &&
+ findCommand->getSampleId()) {
+ analyze_shard_key::QueryAnalysisWriter::get(opCtx)
+ .addFindQuery(*findCommand->getSampleId(),
+ nss,
+ findCommand->getFilter(),
+ findCommand->getCollation())
+ .getAsync([](auto) {});
+ }
+
// Going forward this operation must never ignore interrupt signals while waiting for
// lock acquisition. This InterruptibleLockGuard will ensure that waiting for lock
// re-acquisition after yielding will not ignore interrupt signals. This is necessary to
@@ -542,10 +556,10 @@ public:
const auto& findCommand = cq->getFindCommandRequest();
auto viewAggregationCommand =
uassertStatusOK(query_request_helper::asAggregationCommand(findCommand));
-
- BSONObj aggResult = CommandHelpers::runCommandDirectly(
- opCtx,
- OpMsgRequest::fromDBAndBody(_dbName.db(), std::move(viewAggregationCommand)));
+ auto aggRequest =
+ OpMsgRequestBuilder::create(_dbName, std::move(viewAggregationCommand));
+ aggRequest.validatedTenancyScope = _request.validatedTenancyScope;
+ BSONObj aggResult = CommandHelpers::runCommandDirectly(opCtx, aggRequest);
auto status = getStatusFromCommandResult(aggResult);
if (status.code() == ErrorCodes::InvalidPipelineOperator) {
uasserted(ErrorCodes::InvalidPipelineOperator,
@@ -572,6 +586,8 @@ public:
opCtx->recoveryUnit()->setReadOnce(true);
}
+ cq->setUseCqfIfEligible(true);
+
// Get the execution plan for the query.
bool permitYield = true;
auto exec =
diff --git a/src/mongo/db/commands/generic.cpp b/src/mongo/db/commands/generic.cpp
index c5f8dde7e74..9bc1717f6e8 100644
--- a/src/mongo/db/commands/generic.cpp
+++ b/src/mongo/db/commands/generic.cpp
@@ -173,6 +173,10 @@ public:
return false;
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
bool run(OperationContext* opCtx,
const DatabaseName&,
const BSONObj& cmdObj,
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 3d6c73db66f..09aa73cc315 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/client.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/external_data_source_scope_guard.h"
#include "mongo/db/curop.h"
#include "mongo/db/curop_failpoint_helpers.h"
#include "mongo/db/cursor_manager.h"
@@ -485,6 +486,10 @@ public:
setUpOperationContextStateForGetMore(
opCtx, *cursorPin.getCursor(), _cmd, disableAwaitDataFailpointActive);
+ // Update opCtx of the decorated ExternalDataSourceScopeGuard object so that it can drop
+ // virtual collections in the new 'opCtx'.
+ ExternalDataSourceScopeGuard::updateOperationContext(cursorPin.getCursor(), opCtx);
+
// On early return, typically due to a failed assertion, delete the cursor.
ScopeGuard cursorDeleter([&] { cursorPin.deleteUnderlying(); });
@@ -717,10 +722,10 @@ public:
// internal clients (see checkAuthForGetMore).
curOp->debug().isReplOplogGetMore = true;
- // We do not want to take tickets for internal (replication) oplog reads. Stalling
- // on ticket acquisition can cause complicated deadlocks. Primaries may depend on
- // data reaching secondaries in order to proceed; and secondaries may get stalled
- // replicating because of an inability to acquire a read ticket.
+ // We do not want to wait to take tickets for internal (replication) oplog reads.
+ // Stalling on ticket acquisition can cause complicated deadlocks. Primaries may
+ // depend on data reaching secondaries in order to proceed; and secondaries may get
+ // stalled replicating because of an inability to acquire a read ticket.
opCtx->lockState()->setAdmissionPriority(AdmissionContext::Priority::kImmediate);
}
diff --git a/src/mongo/db/commands/parameters.cpp b/src/mongo/db/commands/parameters.cpp
index 0cfb0f6dcf2..0829fcd0e32 100644
--- a/src/mongo/db/commands/parameters.cpp
+++ b/src/mongo/db/commands/parameters.cpp
@@ -238,6 +238,11 @@ public:
h += "{ getParameter:'*' } or { getParameter:{allParameters: true} } to get everything\n";
return h;
}
+
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
bool run(OperationContext* opCtx,
const DatabaseName& dbName,
const BSONObj& cmdObj,
diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp
index b2fb0341568..13037602698 100644
--- a/src/mongo/db/commands/pipeline_command.cpp
+++ b/src/mongo/db/commands/pipeline_command.cpp
@@ -35,8 +35,8 @@
#include "mongo/db/auth/authorization_checks.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/create_collection.h"
-#include "mongo/db/catalog/virtual_collection_options.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/external_data_source_scope_guard.h"
#include "mongo/db/commands/run_aggregate.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
@@ -46,7 +46,7 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/idl/idl_parser.h"
-#include "mongo/stdx/unordered_set.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/database_name_util.h"
namespace mongo {
@@ -202,18 +202,20 @@ public:
CommandHelpers::handleMarkKillOnClientDisconnect(
opCtx, !Pipeline::aggHasWriteStage(_request.body));
- // TODO SERVER-69687 Create a virtual collection per each used external data source.
- for (auto&& [collName, dataSources] : _usedExternalDataSources) {
- VirtualCollectionOptions vopts(dataSources);
- }
-
+ // Create virtual collections and drop them when aggregate command is done. Conceptually
+ // ownership of virtual collections are moved to runAggregate() function together with
+ // 'dropVcollGuard' so that it can clean up virtual collections when it's done with
+ // them. ExternalDataSourceScopeGuard will take care of the situation when any
+ // collection could not be created.
+ ExternalDataSourceScopeGuard dropVcollGuard(opCtx, _usedExternalDataSources);
uassertStatusOK(runAggregate(opCtx,
_aggregationRequest.getNamespace(),
_aggregationRequest,
_liteParsedPipeline,
_request.body,
_privileges,
- reply));
+ reply,
+ std::move(dropVcollGuard)));
// The aggregate command's response is unstable when 'explain' or 'exchange' fields are
// set.
@@ -231,14 +233,16 @@ public:
void explain(OperationContext* opCtx,
ExplainOptions::Verbosity verbosity,
rpc::ReplyBuilderInterface* result) override {
-
+ // See run() method for details.
+ ExternalDataSourceScopeGuard dropVcollGuard(opCtx, _usedExternalDataSources);
uassertStatusOK(runAggregate(opCtx,
_aggregationRequest.getNamespace(),
_aggregationRequest,
_liteParsedPipeline,
_request.body,
_privileges,
- result));
+ result,
+ std::move(dropVcollGuard)));
}
void doCheckAuthorization(OperationContext* opCtx) const override {
diff --git a/src/mongo/db/commands/profile_common.cpp b/src/mongo/db/commands/profile_common.cpp
index 54223b8f5a7..ba546e62067 100644
--- a/src/mongo/db/commands/profile_common.cpp
+++ b/src/mongo/db/commands/profile_common.cpp
@@ -133,6 +133,7 @@ bool ProfileCmdBase::run(OperationContext* opCtx,
newState.append("filter"_sd, newSettings.filter->serialize());
}
attrs.add("to", newState.obj());
+ attrs.add("db", dbName.db());
LOGV2(48742, "Profiler settings changed", attrs);
}
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index eb32a6c220b..cb15cf82247 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/change_stream_serverless_helpers.h"
+#include "mongo/db/commands/external_data_source_scope_guard.h"
#include "mongo/db/curop.h"
#include "mongo/db/cursor_manager.h"
#include "mongo/db/db_raii.h"
@@ -81,6 +82,7 @@
#include "mongo/db/repl/speculative_majority_read_info.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/query_analysis_writer.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/resource_consumption_metrics.h"
@@ -659,7 +661,7 @@ Status runAggregate(OperationContext* opCtx,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
rpc::ReplyBuilderInterface* result) {
- return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result);
+ return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result, {});
}
Status runAggregate(OperationContext* opCtx,
@@ -668,7 +670,8 @@ Status runAggregate(OperationContext* opCtx,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
- rpc::ReplyBuilderInterface* result) {
+ rpc::ReplyBuilderInterface* result,
+ ExternalDataSourceScopeGuard externalDataSourceGuard) {
// Perform some validations on the LiteParsedPipeline and request before continuing with the
// aggregation command.
@@ -945,6 +948,15 @@ Status runAggregate(OperationContext* opCtx,
}
}
+ if (analyze_shard_key::supportsPersistingSampledQueries() && request.getSampleId()) {
+ analyze_shard_key::QueryAnalysisWriter::get(opCtx)
+ .addAggregateQuery(*request.getSampleId(),
+ expCtx->ns,
+ pipeline->getInitialQuery(),
+ expCtx->getCollatorBSON())
+ .getAsync([](auto) {});
+ }
+
// If the aggregate command supports encrypted collections, do rewrites of the pipeline to
// support querying against encrypted fields.
if (shouldDoFLERewrite(request)) {
@@ -1021,6 +1033,8 @@ Status runAggregate(OperationContext* opCtx,
p.deleteUnderlying();
}
});
+ auto extDataSrcGuard =
+ std::make_shared<ExternalDataSourceScopeGuard>(std::move(externalDataSourceGuard));
for (auto&& exec : execs) {
ClientCursorParams cursorParams(
std::move(exec),
@@ -1038,6 +1052,10 @@ Status runAggregate(OperationContext* opCtx,
pin->incNBatches();
cursors.emplace_back(pin.getCursor());
+ // All cursors share the ownership to 'extDataSrcGuard' and if the last cursor is destroyed,
+ // 'extDataSrcGuard' is also destroyed and created virtual collections are dropped by the
+ // destructor of ExternalDataSourceScopeGuard.
+ ExternalDataSourceScopeGuard::get(pin.getCursor()) = extDataSrcGuard;
pins.emplace_back(std::move(pin));
}
diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h
index 0bb86ac91b0..33c03e80cdb 100644
--- a/src/mongo/db/commands/run_aggregate.h
+++ b/src/mongo/db/commands/run_aggregate.h
@@ -32,6 +32,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/auth/privilege.h"
+#include "mongo/db/commands/external_data_source_scope_guard.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
@@ -57,7 +58,8 @@ Status runAggregate(OperationContext* opCtx,
const LiteParsedPipeline& liteParsedPipeline,
const BSONObj& cmdObj,
const PrivilegeVector& privileges,
- rpc::ReplyBuilderInterface* result);
+ rpc::ReplyBuilderInterface* result,
+ ExternalDataSourceScopeGuard externalDataSourceGuard);
/**
* Convenience version that internally constructs the LiteParsedPipeline.
diff --git a/src/mongo/db/commands/server_status_command.cpp b/src/mongo/db/commands/server_status_command.cpp
index 62433e57e62..521e4a49010 100644
--- a/src/mongo/db/commands/server_status_command.cpp
+++ b/src/mongo/db/commands/server_status_command.cpp
@@ -78,6 +78,10 @@ public:
return Status::OK();
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
bool run(OperationContext* opCtx,
const DatabaseName& dbName,
const BSONObj& cmdObj,
diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
index 96bfcd775cd..c8f58df0c0c 100644
--- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
+++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl
@@ -97,6 +97,7 @@ commands:
strict: true
namespace: ignored
api_version: ""
+ reply_type: recipientSyncDataResponse
inline_chained_structs: true
chained_structs:
MigrationRecipientCommonData: MigrationRecipientCommonData
@@ -144,6 +145,7 @@ commands:
recipientForgetMigration:
description: "Parser for the 'recipientForgetMigration' command."
command_name: recipientForgetMigration
+ reply_type: OkReply
strict: true
namespace: ignored
api_version: ""