diff options
Diffstat (limited to 'src/mongo/db/commands')
21 files changed, 1342 insertions, 142 deletions
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 0074dc859c6..19de67d69b6 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -367,6 +367,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/replica_set_messages', '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/db/rw_concern_d', + '$BUILD_DIR/mongo/db/s/query_analysis_writer', '$BUILD_DIR/mongo/db/server_base', '$BUILD_DIR/mongo/db/server_feature_flags', '$BUILD_DIR/mongo/db/session/session_catalog_mongod', @@ -789,6 +790,7 @@ env.CppUnitTest( target="db_commands_test", source=[ "create_indexes_test.cpp", + "external_data_source_commands_test.cpp", "index_filter_commands_test.cpp", "fle_compact_test.cpp", "list_collections_filter_test.cpp", @@ -819,7 +821,9 @@ env.CppUnitTest( "$BUILD_DIR/mongo/db/repl/replmocks", "$BUILD_DIR/mongo/db/repl/storage_interface_impl", "$BUILD_DIR/mongo/db/service_context_d_test_fixture", + "$BUILD_DIR/mongo/db/storage/record_store_base", '$BUILD_DIR/mongo/idl/idl_parser', + '$BUILD_DIR/mongo/util/version_impl', "cluster_server_parameter_commands_invocation", "core", "create_command", diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index 21ec11df16f..9a961c1ed0b 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -44,6 +44,7 @@ #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/view_response_formatter.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/query_analysis_writer.h" #include "mongo/logv2/log.h" #include "mongo/util/database_name_util.h" @@ -249,6 +250,15 @@ public: invocation->markMirrored(); } + if (analyze_shard_key::supportsPersistingSampledQueries() && request.getSampleId()) { + analyze_shard_key::QueryAnalysisWriter::get(opCtx) + .addCountQuery(*request.getSampleId(), + nss, + request.getQuery(), + request.getCollation().value_or(BSONObj())) + .getAsync([](auto) {}); + } + if (ctx->getView()) { auto viewAggregation = countCommandAsAggregationCommand(request, nss); diff --git a/src/mongo/db/commands/dbcheck.cpp b/src/mongo/db/commands/dbcheck.cpp index 1b1110abb29..f8aafcae484 100644 --- a/src/mongo/db/commands/dbcheck.cpp +++ b/src/mongo/db/commands/dbcheck.cpp @@ -65,9 +65,8 @@ repl::OpTime _logOp(OperationContext* opCtx, repl::MutableOplogEntry oplogEntry; oplogEntry.setOpType(repl::OpTypeEnum::kCommand); oplogEntry.setNss(nss); - if (uuid) { - oplogEntry.setUuid(*uuid); - } + oplogEntry.setTid(nss.tenantId()); + oplogEntry.setUuid(uuid); oplogEntry.setObject(obj); AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); return writeConflictRetry( @@ -144,7 +143,6 @@ struct DbCheckCollectionInfo { int64_t maxDocsPerBatch; int64_t maxBytesPerBatch; int64_t maxBatchTimeMillis; - bool snapshotRead; WriteConcernOptions writeConcern; }; @@ -167,6 +165,8 @@ std::unique_ptr<DbCheckRun> singleCollectionRun(OperationContext* opCtx, "Cannot run dbCheck on " + nss.toString() + " because it is not replicated", nss.isReplicated()); + uassert(6769500, "dbCheck no longer supports snapshotRead:false", invocation.getSnapshotRead()); + const auto start = invocation.getMinKey(); const auto end = invocation.getMaxKey(); const auto maxCount = invocation.getMaxCount(); @@ -184,7 +184,6 @@ std::unique_ptr<DbCheckRun> singleCollectionRun(OperationContext* opCtx, maxDocsPerBatch, maxBytesPerBatch, maxBatchTimeMillis, - invocation.getSnapshotRead(), invocation.getBatchWriteConcern()}; auto result = std::make_unique<DbCheckRun>(); result->push_back(info); @@ -201,6 +200,8 @@ std::unique_ptr<DbCheckRun> fullDatabaseRun(OperationContext* opCtx, AutoGetDb agd(opCtx, dbName, MODE_IS); uassert(ErrorCodes::NamespaceNotFound, "Database " + dbName.db() + " not found", agd.getDb()); + uassert(6769501, "dbCheck no longer supports snapshotRead:false", invocation.getSnapshotRead()); + const int64_t max = std::numeric_limits<int64_t>::max(); const auto rate = invocation.getMaxCountPerSecond(); const auto maxDocsPerBatch = invocation.getMaxDocsPerBatch(); @@ -220,7 +221,6 @@ std::unique_ptr<DbCheckRun> fullDatabaseRun(OperationContext* opCtx, maxDocsPerBatch, maxBytesPerBatch, maxBatchTimeMillis, - invocation.getSnapshotRead(), invocation.getBatchWriteConcern()}; result->push_back(info); return true; @@ -241,7 +241,8 @@ std::unique_ptr<DbCheckRun> getRun(OperationContext* opCtx, // Get rid of generic command fields. for (const auto& elem : obj) { - if (!isGenericArgument(elem.fieldNameStringData())) { + const auto& fieldName = elem.fieldNameStringData(); + if (!isGenericArgument(fieldName)) { builder.append(elem); } } @@ -251,11 +252,17 @@ std::unique_ptr<DbCheckRun> getRun(OperationContext* opCtx, // If the dbCheck argument is a string, this is the per-collection form. if (toParse["dbCheck"].type() == BSONType::String) { return singleCollectionRun( - opCtx, dbName, DbCheckSingleInvocation::parse(IDLParserContext(""), toParse)); + opCtx, + dbName, + DbCheckSingleInvocation::parse( + IDLParserContext("", false /*apiStrict*/, dbName.tenantId()), toParse)); } else { // Otherwise, it's the database-wide form. return fullDatabaseRun( - opCtx, dbName, DbCheckAllInvocation::parse(IDLParserContext(""), toParse)); + opCtx, + dbName, + DbCheckAllInvocation::parse( + IDLParserContext("", false /*apiStrict*/, dbName.tenantId()), toParse)); } } @@ -486,122 +493,77 @@ private: const BSONKey& first, int64_t batchDocs, int64_t batchBytes) { - auto lockMode = MODE_S; - if (info.snapshotRead) { - // Each batch will read at the latest no-overlap point, which is the all_durable - // timestamp on primaries. We assume that the history window on secondaries is always - // longer than the time it takes between starting and replicating a batch on the - // primary. Otherwise, the readTimestamp will not be available on a secondary by the - // time it processes the oplog entry. - lockMode = MODE_IS; - opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoOverlap); + // Each batch will read at the latest no-overlap point, which is the all_durable timestamp + // on primaries. We assume that the history window on secondaries is always longer than the + // time it takes between starting and replicating a batch on the primary. Otherwise, the + // readTimestamp will not be available on a secondary by the time it processes the oplog + // entry. + opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoOverlap); + + // dbCheck writes to the oplog, so we need to take an IX lock. We don't need to write to the + // collection, however, so we only take an intent lock on it. + Lock::GlobalLock glob(opCtx, MODE_IX); + AutoGetCollection collection(opCtx, info.nss, MODE_IS); + + if (_stepdownHasOccurred(opCtx, info.nss)) { + _done = true; + return Status(ErrorCodes::PrimarySteppedDown, "dbCheck terminated due to stepdown"); } - BatchStats result; - auto timeoutMs = Milliseconds(gDbCheckCollectionTryLockTimeoutMillis.load()); - const auto initialBackoffMs = - Milliseconds(gDbCheckCollectionTryLockMinBackoffMillis.load()); - auto backoffMs = initialBackoffMs; - for (int attempt = 1;; attempt++) { - try { - // Try to acquire collection lock with increasing timeout and bounded exponential - // backoff. - auto const lockDeadline = Date_t::now() + timeoutMs; - timeoutMs *= 2; - - AutoGetCollection agc( - opCtx, info.nss, lockMode, AutoGetCollection::Options{}.deadline(lockDeadline)); - - if (_stepdownHasOccurred(opCtx, info.nss)) { - _done = true; - return Status(ErrorCodes::PrimarySteppedDown, - "dbCheck terminated due to stepdown"); - } + if (!collection) { + const auto msg = "Collection under dbCheck no longer exists"; + return {ErrorCodes::NamespaceNotFound, msg}; + } - const auto& collection = - CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, info.nss); - if (!collection) { - const auto msg = "Collection under dbCheck no longer exists"; - return {ErrorCodes::NamespaceNotFound, msg}; - } + auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx); + uassert(ErrorCodes::SnapshotUnavailable, + "No snapshot available yet for dbCheck", + readTimestamp); + auto minVisible = collection->getMinimumVisibleSnapshot(); + if (minVisible && *readTimestamp < *collection->getMinimumVisibleSnapshot()) { + return {ErrorCodes::SnapshotUnavailable, + str::stream() << "Unable to read from collection " << info.nss + << " due to pending catalog changes"}; + } - auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx); - auto minVisible = collection->getMinimumVisibleSnapshot(); - if (readTimestamp && minVisible && - *readTimestamp < *collection->getMinimumVisibleSnapshot()) { - return {ErrorCodes::SnapshotUnavailable, - str::stream() << "Unable to read from collection " << info.nss - << " due to pending catalog changes"}; - } + boost::optional<DbCheckHasher> hasher; + try { + hasher.emplace(opCtx, + *collection, + first, + info.end, + std::min(batchDocs, info.maxCount), + std::min(batchBytes, info.maxSize)); + } catch (const DBException& e) { + return e.toStatus(); + } - boost::optional<DbCheckHasher> hasher; - try { - hasher.emplace(opCtx, - collection, - first, - info.end, - std::min(batchDocs, info.maxCount), - std::min(batchBytes, info.maxSize)); - } catch (const DBException& e) { - return e.toStatus(); - } + const auto batchDeadline = Date_t::now() + Milliseconds(info.maxBatchTimeMillis); + Status status = hasher->hashAll(opCtx, batchDeadline); - const auto batchDeadline = Date_t::now() + Milliseconds(info.maxBatchTimeMillis); - Status status = hasher->hashAll(opCtx, batchDeadline); + if (!status.isOK()) { + return status; + } - if (!status.isOK()) { - return status; - } + std::string md5 = hasher->total(); - std::string md5 = hasher->total(); - - DbCheckOplogBatch batch; - batch.setType(OplogEntriesEnum::Batch); - batch.setNss(info.nss); - batch.setMd5(md5); - batch.setMinKey(first); - batch.setMaxKey(BSONKey(hasher->lastKey())); - batch.setReadTimestamp(readTimestamp); - - // Send information on this batch over the oplog. - result.time = _logOp(opCtx, info.nss, collection->uuid(), batch.toBSON()); - result.readTimestamp = readTimestamp; - - result.nDocs = hasher->docsSeen(); - result.nBytes = hasher->bytesSeen(); - result.lastKey = hasher->lastKey(); - result.md5 = md5; - - break; - } catch (const ExceptionFor<ErrorCodes::LockTimeout>& e) { - if (attempt > gDbCheckCollectionTryLockMaxAttempts.load()) { - return StatusWith<BatchStats>(e.code(), - "Unable to acquire the collection lock"); - } + DbCheckOplogBatch batch; + batch.setType(OplogEntriesEnum::Batch); + batch.setNss(info.nss); + batch.setMd5(md5); + batch.setMinKey(first); + batch.setMaxKey(BSONKey(hasher->lastKey())); + batch.setReadTimestamp(readTimestamp); - // Bounded exponential backoff between tryLocks. - opCtx->sleepFor(backoffMs); - const auto maxBackoffMillis = - Milliseconds(gDbCheckCollectionTryLockMaxBackoffMillis.load()); - if (backoffMs < maxBackoffMillis) { - auto backoff = durationCount<Milliseconds>(backoffMs); - auto initialBackoff = durationCount<Milliseconds>(initialBackoffMs); - backoff *= initialBackoff; - backoffMs = Milliseconds(backoff); - } - if (backoffMs > maxBackoffMillis) { - backoffMs = maxBackoffMillis; - } - LOGV2_DEBUG(6175700, - 1, - "Could not acquire collection lock, retrying", - "ns"_attr = info.nss.ns(), - "batchRangeMin"_attr = info.start.obj(), - "batchRangeMax"_attr = info.end.obj(), - "attempt"_attr = attempt, - "backoff"_attr = backoffMs); - } - } + // Send information on this batch over the oplog. + BatchStats result; + result.time = _logOp(opCtx, info.nss, collection->uuid(), batch.toBSON()); + result.readTimestamp = readTimestamp; + + result.nDocs = hasher->docsSeen(); + result.nBytes = hasher->bytesSeen(); + result.lastKey = hasher->lastKey(); + result.md5 = md5; return result; } @@ -663,7 +625,6 @@ public: " maxDocsPerBatch: <max number of docs/batch>\n" " maxBytesPerBatch: <try to keep a batch within max bytes/batch>\n" " maxBatchTimeMillis: <max time processing a batch in milliseconds>\n" - " readTimestamp: <bool, read at a timestamp without strong locks> }\n" "to check a collection.\n" "Invoke with {dbCheck: 1} to check all collections in the database."; } diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index 6374a093936..0b6f90d0203 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -485,6 +485,10 @@ public: return Request::kCommandDescription.toString(); } + bool allowedWithSecurityToken() const final { + return true; + } + // Assume that appendCollectionStorageStats() gives us a valid response. void validateResult(const BSONObj& resultObj) final {} @@ -587,6 +591,10 @@ public: CmdDbStats() : TypedCommand(Request::kCommandName, Request::kCommandAlias) {} + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBase { public: using InvocationBase::InvocationBase; diff --git a/src/mongo/db/commands/dbcommands_d.cpp b/src/mongo/db/commands/dbcommands_d.cpp index 5f42cb45010..5ad0adf46c5 100644 --- a/src/mongo/db/commands/dbcommands_d.cpp +++ b/src/mongo/db/commands/dbcommands_d.cpp @@ -248,6 +248,10 @@ public: return Status::OK(); } + bool allowedWithSecurityToken() const final { + return true; + } + bool run(OperationContext* opCtx, const DatabaseName& dbName, const BSONObj& jsobj, diff --git a/src/mongo/db/commands/distinct.cpp b/src/mongo/db/commands/distinct.cpp index 070da8d285c..eb599424001 100644 --- a/src/mongo/db/commands/distinct.cpp +++ b/src/mongo/db/commands/distinct.cpp @@ -57,6 +57,7 @@ #include "mongo/db/query/query_planner_common.h" #include "mongo/db/query/view_response_formatter.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/query_analysis_writer.h" #include "mongo/db/views/resolved_view.h" #include "mongo/logv2/log.h" #include "mongo/util/database_name_util.h" @@ -244,6 +245,16 @@ public: invocation->markMirrored(); } + if (analyze_shard_key::supportsPersistingSampledQueries() && parsedDistinct.getSampleId()) { + auto cq = parsedDistinct.getQuery(); + analyze_shard_key::QueryAnalysisWriter::get(opCtx) + .addDistinctQuery(*parsedDistinct.getSampleId(), + nss, + cq->getQueryObj(), + cq->getFindCommandRequest().getCollation()) + .getAsync([](auto) {}); + } + if (ctx->getView()) { // Relinquish locks. The aggregation command will re-acquire them. ctx.reset(); diff --git a/src/mongo/db/commands/external_data_source_commands_test.cpp b/src/mongo/db/commands/external_data_source_commands_test.cpp new file mode 100644 index 00000000000..113495efdc8 --- /dev/null +++ b/src/mongo/db/commands/external_data_source_commands_test.cpp @@ -0,0 +1,953 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <fmt/format.h> + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/dbclient_cursor.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/pipeline/aggregation_request_helper.h" +#include "mongo/db/query/query_knobs_gen.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/storage/named_pipe.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace { +using namespace fmt::literals; + +class PipeWaiter { +public: + void notify() { + { + stdx::unique_lock lk(m); + pipeCreated = true; + } + cv.notify_one(); + } + + void wait() { + stdx::unique_lock lk(m); + cv.wait(lk, [&] { return pipeCreated; }); + } + +private: + Mutex m; + stdx::condition_variable cv; + bool pipeCreated = false; +}; + +class ExternalDataSourceCommandsTest : public ServiceContextMongoDTest { +protected: + void setUp() override { + ServiceContextMongoDTest::setUp(); + + std::srand(std::time(0)); + + const auto service = getServiceContext(); + auto replCoord = + std::make_unique<repl::ReplicationCoordinatorMock>(service, repl::ReplSettings{}); + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + repl::ReplicationCoordinator::set(service, std::move(replCoord)); + repl::createOplog(_opCtx); + + computeModeEnabled = true; + } + + void tearDown() override { + computeModeEnabled = false; + ServiceContextMongoDTest::tearDown(); + } + + std::vector<BSONObj> generateRandomSimpleDocs(int count) { + std::vector<BSONObj> docs; + for (int i = 0; i < count; ++i) { + docs.emplace_back(BSON("a" << std::rand() % 10)); + } + + return docs; + } + + // Generates a large readable random string to aid debugging. + std::string getRandomReadableLargeString() { + int count = std::rand() % 100 + 2024; + std::string str(count, '\0'); + for (int i = 0; i < count; ++i) { + str[i] = static_cast<char>(std::rand() % 26) + 'a'; + } + + return str; + } + + std::vector<BSONObj> generateRandomLargeDocs(int count) { + std::vector<BSONObj> docs; + for (int i = 0; i < count; ++i) { + docs.emplace_back(BSON("a" << getRandomReadableLargeString())); + } + + return docs; + } + + // This verifies that a simple explain aggregate command works. Virtual collections are created + // even for explain aggregate command. + void verifyExplainAggCommand(DBDirectClient& client, const BSONObj& explainAggCmdObj) { + // The first request. + BSONObj res; + ASSERT_TRUE(client.runCommand(kDatabaseName, explainAggCmdObj.getOwned(), res)) + << "Expected to succeed but failed. result = {}"_format(res.toString()); + // Sanity checks of result. + ASSERT_EQ(res["ok"].Number(), 1.0) + << "Expected to succeed but failed. result = {}"_format(res.toString()); + } + + ServiceContext::UniqueOperationContext _uniqueOpCtx{makeOperationContext()}; + OperationContext* _opCtx{_uniqueOpCtx.get()}; + + BSONObj explainSingleNamedPipeAggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [], + explain: true, + $_externalDataSources: [{ + collName: "coll", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + BSONObj explainMultipleNamedPipesAggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [], + explain: true, + $_externalDataSources: [{ + collName: "coll", + dataSources: [ + {url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}, + {url: "file://named_pipe2", storageType: "pipe", fileType: "bson"} + ] + }] +} + )"); + + static constexpr auto kDatabaseName = "external_data_source"; +}; + +TEST_F(ExternalDataSourceCommandsTest, SimpleScanAggRequest) { + const auto nDocs = std::rand() % 100 + 1; + std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs); + PipeWaiter pw; + + stdx::thread producer([&] { + NamedPipeOutput pipeWriter("named_pipe1"); + pw.notify(); + pipeWriter.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [], + cursor: {}, + $_externalDataSources: [{ + collName: "coll", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + + // The first request. + BSONObj res; + ASSERT_TRUE(client.runCommand(kDatabaseName, aggCmdObj.getOwned(), res)); + + // Sanity checks of result. + ASSERT_EQ(res["ok"].Number(), 1.0); + ASSERT_TRUE(res.hasField("cursor") && res["cursor"].Obj().hasField("firstBatch")); + + // The default batch size is 101 and so all data must be contained in the first batch. cursor.id + // == 0 means that no cursor is necessary. + ASSERT_TRUE(res["cursor"].Obj().hasField("id") && res["cursor"]["id"].Long() == 0); + auto resDocs = res["cursor"]["firstBatch"].Array(); + ASSERT_EQ(resDocs.size(), nDocs); + for (int i = 0; i < nDocs; ++i) { + ASSERT_BSONOBJ_EQ(resDocs[i].Obj(), srcDocs[i]); + } + + // The second request. This verifies that virtual collections are cleaned up after the + // aggregation request is done. + verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj); +} + +TEST_F(ExternalDataSourceCommandsTest, SimpleScanOverMultipleNamedPipesAggRequest) { + // This data set fits into the first batch. + const auto nDocs = std::rand() % 50; + std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs); + PipeWaiter pw; + + // Pushes data into multiple named pipes. We can't push data into multiple named pipes + // simultaneously because writers will be blocked until the reader consumes data. So, we push + // data into one named pipe after another. + stdx::thread producer([&] { + NamedPipeOutput pipeWriter1("named_pipe1"); + NamedPipeOutput pipeWriter2("named_pipe2"); + pw.notify(); + pipeWriter1.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter1.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter1.close(); + + pipeWriter2.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter2.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter2.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [], + cursor: {}, + $_externalDataSources: [{ + collName: "coll", + dataSources: [ + {url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}, + {url: "file://named_pipe2", storageType: "pipe", fileType: "bson"} + ] + }] +} + )"); + + // The first request. + BSONObj res; + ASSERT_TRUE(client.runCommand(kDatabaseName, aggCmdObj.getOwned(), res)); + + // Sanity checks of result. + ASSERT_EQ(res["ok"].Number(), 1.0); + ASSERT_TRUE(res.hasField("cursor") && res["cursor"].Obj().hasField("firstBatch")); + + // The default batch size is 101 and so all data must be contained in the first batch. cursor.id + // == 0 means that no cursor is necessary. + ASSERT_TRUE(res["cursor"].Obj().hasField("id") && res["cursor"]["id"].Long() == 0); + auto resDocs = res["cursor"]["firstBatch"].Array(); + ASSERT_EQ(resDocs.size(), nDocs * 2); + for (int i = 0; i < nDocs; ++i) { + ASSERT_BSONOBJ_EQ(resDocs[i].Obj(), srcDocs[i % nDocs]); + } + + // The second request. This verifies that virtual collections are cleaned up after the + // aggregation request is done. + verifyExplainAggCommand(client, explainMultipleNamedPipesAggCmdObj); +} + +TEST_F(ExternalDataSourceCommandsTest, SimpleScanOverLargeObjectsAggRequest) { + // MultiBsonStreamCursor's default buffer size is 8K and 2K (at minimum) * 20 would be enough to + // exceed the initial read. This data set is highly likely to span multiple reads. + const auto nDocs = std::rand() % 80 + 20; + std::vector<BSONObj> srcDocs = generateRandomLargeDocs(nDocs); + PipeWaiter pw; + + stdx::thread producer([&] { + NamedPipeOutput pipeWriter("named_pipe1"); + pw.notify(); + pipeWriter.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [], + cursor: {}, + $_externalDataSources: [{ + collName: "coll", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + + // The first request. + BSONObj res; + ASSERT_TRUE(client.runCommand(kDatabaseName, aggCmdObj.getOwned(), res)); + + // Sanity checks of result. + ASSERT_EQ(res["ok"].Number(), 1.0); + ASSERT_TRUE(res.hasField("cursor") && res["cursor"].Obj().hasField("firstBatch")); + + // The default batch size is 101 and so all data must be contained in the first batch. cursor.id + // == 0 means that no cursor is necessary. + ASSERT_TRUE(res["cursor"].Obj().hasField("id") && res["cursor"]["id"].Long() == 0); + auto resDocs = res["cursor"]["firstBatch"].Array(); + ASSERT_EQ(resDocs.size(), nDocs); + for (int i = 0; i < nDocs; ++i) { + ASSERT_BSONOBJ_EQ(resDocs[i].Obj(), srcDocs[i]); + } + + // The second request. This verifies that virtual collections are cleaned up after the + // aggregation request is done. + verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj); +} + +// Tests that 'explain' flag works and also tests that the same aggregation request works with the +// same $_externalDataSources again to see whether there are no remaining virtual collections left +// behind after the aggregation request is done. +TEST_F(ExternalDataSourceCommandsTest, ExplainAggRequest) { + DBDirectClient client(_opCtx); + // The first request. + verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj); + + // The second request. + verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj); +} + +TEST_F(ExternalDataSourceCommandsTest, SimpleScanMultiBatchAggRequest) { + // This 'nDocs' causes a cursor to be created for a simple scan aggregate command. + const auto nDocs = std::rand() % 100 + 102; + std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs); + PipeWaiter pw; + + stdx::thread producer([&] { + NamedPipeOutput pipeWriter("named_pipe1"); + pw.notify(); + pipeWriter.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [], + cursor: {}, + $_externalDataSources: [{ + collName: "coll", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + + auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj); + ASSERT_OK(swAggReq.getStatus()); + auto swCursor = DBClientCursor::fromAggregationRequest( + &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false); + ASSERT_OK(swCursor.getStatus()); + + auto cursor = std::move(swCursor.getValue()); + int resCnt = 0; + // While iterating over the cursor, getMore() request(s) will be sent and the server-side cursor + // will be destroyed after all data is exhausted. + while (cursor->more()) { + auto doc = cursor->next(); + ASSERT_BSONOBJ_EQ(doc, srcDocs[resCnt]); + ++resCnt; + } + ASSERT_EQ(resCnt, nDocs); + + // The second explain request. This verifies that virtual collections are cleaned up after + // multi-batch result for an aggregation request. + verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj); +} + +TEST_F(ExternalDataSourceCommandsTest, SimpleMatchAggRequest) { + const auto nDocs = std::rand() % 100 + 1; + std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs); + // Expected results for {$match: {a: {$lt: 5}}}. + std::vector<BSONObj> expectedDocs; + std::for_each(srcDocs.begin(), srcDocs.end(), [&](const BSONObj& doc) { + if (doc["a"].Int() < 5) { + expectedDocs.emplace_back(doc); + } + }); + PipeWaiter pw; + + stdx::thread producer([&] { + NamedPipeOutput pipeWriter("named_pipe1"); + pw.notify(); + pipeWriter.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [{$match: {a: {$lt: 5}}}], + cursor: {}, + $_externalDataSources: [{ + collName: "coll", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + + auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj); + ASSERT_OK(swAggReq.getStatus()); + auto swCursor = DBClientCursor::fromAggregationRequest( + &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false); + ASSERT_OK(swCursor.getStatus()); + + auto cursor = std::move(swCursor.getValue()); + int resCnt = 0; + while (cursor->more()) { + auto doc = cursor->next(); + ASSERT_BSONOBJ_EQ(doc, expectedDocs[resCnt]); + ++resCnt; + } + ASSERT_EQ(resCnt, expectedDocs.size()); + + // The second explain request. This verifies that virtual collections are cleaned up after + // the aggregation request is done. + verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj); +} + +TEST_F(ExternalDataSourceCommandsTest, ScanOverRandomInvalidDataAggRequest) { + const auto nDocs = std::rand() % 100 + 1; + std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs); + PipeWaiter pw; + + stdx::thread producer([&] { + NamedPipeOutput pipeWriter("named_pipe1"); + pw.notify(); + const size_t failPoint = std::rand() % nDocs; + pipeWriter.open(); + for (size_t i = 0; i < srcDocs.size(); ++i) { + if (i == failPoint) { + // Intentionally pushes invalid data at the fail point so that an error happens at + // the reader-side + pipeWriter.write(srcDocs[i].objdata(), srcDocs[i].objsize() / 2); + } else { + pipeWriter.write(srcDocs[i].objdata(), srcDocs[i].objsize()); + } + } + pipeWriter.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [{$match: {a: {$lt: 5}}}], + cursor: {}, + $_externalDataSources: [{ + collName: "coll", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + + BSONObj res; + ASSERT_FALSE(client.runCommand(kDatabaseName, aggCmdObj.getOwned(), res)); + ASSERT_EQ(res["ok"].Number(), 0.0); + // The fail point is randomly chosen and different error codes are expected, depending on the + // chosen fail point. + ASSERT_NE(ErrorCodes::Error(res["code"].Int()), ErrorCodes::OK); + + // The second explain request. This verifies that virtual collections are cleaned up after + // the aggregation request fails. + verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj); +} + +TEST_F(ExternalDataSourceCommandsTest, ScanOverRandomInvalidDataAtSecondBatchAggRequest) { + // This 'nDocs' causes a cursor to be created for a simple scan aggregate command. + const auto nDocs = std::rand() % 100 + 102; // 201 >= nDocs >= 102 + std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs); + PipeWaiter pw; + + stdx::thread producer([&] { + NamedPipeOutput pipeWriter("named_pipe1"); + pw.notify(); + // The fail point occurs at the second batch. + const size_t failPoint = 101 + std::rand() % (nDocs - 101); // 200 >= failPoint >= 101 + pipeWriter.open(); + for (size_t i = 0; i < srcDocs.size(); ++i) { + if (i == failPoint) { + // Intentionally pushes invalid data at the fail point so that an error happens at + // the reader-side + pipeWriter.write(srcDocs[i].objdata(), srcDocs[i].objsize() / 2); + } else { + pipeWriter.write(srcDocs[i].objdata(), srcDocs[i].objsize()); + } + } + pipeWriter.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [], + cursor: {}, + $_externalDataSources: [{ + collName: "coll", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + + auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj); + ASSERT_OK(swAggReq.getStatus()); + auto swCursor = DBClientCursor::fromAggregationRequest( + &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false); + ASSERT_OK(swCursor.getStatus()); + + auto cursor = std::move(swCursor.getValue()); + int resCnt = 0; + bool errorOccurred = false; + try { + while (cursor->more()) { + auto doc = cursor->next(); + ASSERT_BSONOBJ_EQ(doc, srcDocs[resCnt]); + ++resCnt; + } + } catch (const DBException& ex) { + errorOccurred = true; + ASSERT_NE(ex.code(), ErrorCodes::OK); + } + ASSERT_TRUE(errorOccurred); + + // The second explain request. This verifies that virtual collections are cleaned up after + // the getMore request for the aggregation results fails. + verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj); +} + +TEST_F(ExternalDataSourceCommandsTest, KillCursorAfterAggRequest) { + // This 'nDocs' causes a cursor to be created for a simple scan aggregate command. + const auto nDocs = std::rand() % 100 + 102; + std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs); + PipeWaiter pw; + + stdx::thread producer([&] { + NamedPipeOutput pipeWriter("named_pipe1"); + pw.notify(); + pipeWriter.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [], + cursor: {}, + $_externalDataSources: [{ + collName: "coll", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + + // The first request. + BSONObj res; + ASSERT_TRUE(client.runCommand(kDatabaseName, aggCmdObj.getOwned(), res)); + + // Sanity checks of result. + ASSERT_EQ(res["ok"].Number(), 1.0); + ASSERT_TRUE(res.hasField("cursor") && res["cursor"].Obj().hasField("firstBatch")); + + // The default batch size is 101 and results can be returned through multiple batches. cursor.id + // != 0 means that a cursor is created. + auto cursorId = res["cursor"]["id"].Long(); + ASSERT_TRUE(res["cursor"].Obj().hasField("id") && cursorId != 0); + + // Kills the cursor. + auto killCursorCmdObj = BSON("killCursors" + << "coll" + << "cursors" << BSON_ARRAY(cursorId)); + ASSERT_TRUE(client.runCommand(kDatabaseName, killCursorCmdObj.getOwned(), res)); + ASSERT_EQ(res["ok"].Number(), 1.0); + auto cursorsKilled = res["cursorsKilled"].Array(); + ASSERT_TRUE(cursorsKilled.size() == 1 && cursorsKilled[0].Long() == cursorId); + + // The second explain request. This verifies that virtual collections are cleaned up after + // the cursor for the aggregate request is killed. + verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj); +} + +TEST_F(ExternalDataSourceCommandsTest, SimpleScanAndUnionWithMultipleSourcesAggRequest) { + const auto nDocs = std::rand() % 100 + 1; + std::vector<BSONObj> srcDocs = generateRandomSimpleDocs(nDocs); + PipeWaiter pw; + + stdx::thread producer([&] { + NamedPipeOutput pipeWriter1("named_pipe1"); + pw.notify(); + pipeWriter1.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter1.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter1.close(); + + NamedPipeOutput pipeWriter2("named_pipe2"); + pipeWriter2.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter2.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter2.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + // An aggregate request with a simple scan and $unionWith stage. $_externalDataSources option + // defines multiple data sources. + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll1", + pipeline: [{$unionWith: "coll2"}], + cursor: {}, + $_externalDataSources: [{ + collName: "coll1", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }, { + collName: "coll2", + dataSources: [{url: "file://named_pipe2", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + + auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj); + ASSERT_OK(swAggReq.getStatus()); + auto swCursor = DBClientCursor::fromAggregationRequest( + &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false); + ASSERT_OK(swCursor.getStatus()); + + auto cursor = std::move(swCursor.getValue()); + int resCnt = 0; + while (cursor->more()) { + auto doc = cursor->next(); + // Simple scan from 'coll1' first and then $unionWith from 'coll2'. + ASSERT_BSONOBJ_EQ(doc, srcDocs[resCnt % nDocs]); + ++resCnt; + } + ASSERT_EQ(resCnt, nDocs * 2); + + auto explainAggCmdObj = fromjson(R"( +{ + aggregate: "coll1", + pipeline: [{$unionWith: "coll2"}], + explain: true, + $_externalDataSources: [{ + collName: "coll1", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }, { + collName: "coll2", + dataSources: [{url: "file://named_pipe2", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + + // The second explain request. This verifies that virtual collections are cleaned up after + // the aggregation request is done. + verifyExplainAggCommand(client, explainAggCmdObj); +} + +TEST_F(ExternalDataSourceCommandsTest, GroupAggRequest) { + std::vector<BSONObj> srcDocs = { + fromjson(R"( + { + "_id" : 1, + "item" : "a", + "quantity" : 2 + })"), + fromjson(R"( + { + "_id" : 2, + "item" : "b", + "quantity" : 1 + })"), + fromjson(R"( + { + "_id" : 3, + "item" : "a", + "quantity" : 5 + })"), + fromjson(R"( + { + "_id" : 4, + "item" : "b", + "quantity" : 10 + })"), + fromjson(R"( + { + "_id" : 5, + "item" : "c", + "quantity" : 10 + })"), + }; + PipeWaiter pw; + + stdx::thread producer([&] { + NamedPipeOutput pipeWriter("named_pipe1"); + pw.notify(); + pipeWriter.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll", + pipeline: [{$group: {_id: "$item", o: {$sum: "$quantity"}}}], + cursor: {}, + $_externalDataSources: [{ + collName: "coll", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + std::vector<BSONObj> expectedRes = { + fromjson(R"( + { + "_id" : "a", + "o" : 7 + })"), + fromjson(R"( + { + "_id" : "b", + "o" : 11 + })"), + fromjson(R"( + { + "_id" : "c", + "o" : 10 + })"), + }; + + auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj); + ASSERT_OK(swAggReq.getStatus()); + auto swCursor = DBClientCursor::fromAggregationRequest( + &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false); + ASSERT_OK(swCursor.getStatus()); + + auto cursor = std::move(swCursor.getValue()); + int resCnt = 0; + while (cursor->more()) { + auto doc = cursor->next(); + // Result set is pretty small and so we use linear search of vector. + ASSERT_TRUE( + std::find_if(expectedRes.begin(), expectedRes.end(), [&](const BSONObj& expectedObj) { + return expectedObj.objsize() == doc.objsize() && + std::memcmp(expectedObj.objdata(), doc.objdata(), expectedObj.objsize()) == 0; + }) != expectedRes.end()); + ++resCnt; + } + ASSERT_EQ(resCnt, expectedRes.size()); + + // The second explain request. This verifies that virtual collections are cleaned up after + // the aggregation request is done. + verifyExplainAggCommand(client, explainSingleNamedPipeAggCmdObj); +} + +TEST_F(ExternalDataSourceCommandsTest, LookupAggRequest) { + std::vector<BSONObj> srcDocs = { + fromjson(R"( + { + "a" : 1, + "data" : "abcd" + })"), + fromjson(R"( + { + "a" : 2, + "data" : "efgh" + })"), + fromjson(R"( + { + "a" : 3, + "data" : "ijkl" + })"), + }; + PipeWaiter pw; + + // For the $lookup stage, we need data to be available for both named pipes simultaneously + // because $lookup would read data from both collections and so we use two different named + // pipes and pushes data into the inner side first. To avoid racy condition, notify the reader + // side after both named pipes are created. This order is geared toward hash join algorithm. + stdx::thread producer([&] { + NamedPipeOutput pipeWriter2("named_pipe2"); + NamedPipeOutput pipeWriter1("named_pipe1"); + pw.notify(); + + // Pushes data into the inner side (== coll2 with named_pipe2) first because the hash join + // builds the inner (or build) side first. + pipeWriter2.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter2.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter2.close(); + + pipeWriter1.open(); + for (auto&& srcDoc : srcDocs) { + pipeWriter1.write(srcDoc.objdata(), srcDoc.objsize()); + } + pipeWriter1.close(); + }); + ON_BLOCK_EXIT([&] { producer.join(); }); + + // Gives some time to the producer so that it can initialize a named pipe. + pw.wait(); + + DBDirectClient client(_opCtx); + auto aggCmdObj = fromjson(R"( +{ + aggregate: "coll1", + pipeline: [{$lookup: {from: "coll2", localField: "a", foreignField: "a", as: "o"}}], + cursor: {}, + $_externalDataSources: [{ + collName: "coll1", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }, { + collName: "coll2", + dataSources: [{url: "file://named_pipe2", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + std::vector<BSONObj> expectedRes = { + fromjson(R"( + { + "a" : 1, + "data" : "abcd", + "o" : [{"a": 1, "data": "abcd"}] + })"), + fromjson(R"( + { + "a" : 2, + "data" : "efgh", + "o" : [{"a": 2, "data": "efgh"}] + })"), + fromjson(R"( + { + "a" : 3, + "data" : "ijkl", + "o" : [{"a": 3, "data": "ijkl"}] + })"), + }; + + auto swAggReq = aggregation_request_helper::parseFromBSONForTests(kDatabaseName, aggCmdObj); + ASSERT_OK(swAggReq.getStatus()); + auto swCursor = DBClientCursor::fromAggregationRequest( + &client, swAggReq.getValue(), /*secondaryOk*/ false, /*useExhaust*/ false); + ASSERT_OK(swCursor.getStatus()); + + auto cursor = std::move(swCursor.getValue()); + int resCnt = 0; + while (cursor->more()) { + auto doc = cursor->next(); + // Result set is pretty small and so we use linear search of vector. + ASSERT_TRUE( + std::find_if(expectedRes.begin(), expectedRes.end(), [&](const BSONObj& expectedObj) { + return expectedObj.objsize() == doc.objsize() && + std::memcmp(expectedObj.objdata(), doc.objdata(), expectedObj.objsize()) == 0; + }) != expectedRes.end()); + ++resCnt; + } + ASSERT_EQ(resCnt, expectedRes.size()); + + auto explainAggCmdObj = fromjson(R"( +{ + aggregate: "coll1", + pipeline: [{$lookup: {from: "coll2", localField: "a", foreignField: "a", as: "o"}}], + explain: true, + $_externalDataSources: [{ + collName: "coll1", + dataSources: [{url: "file://named_pipe1", storageType: "pipe", fileType: "bson"}] + }, { + collName: "coll2", + dataSources: [{url: "file://named_pipe2", storageType: "pipe", fileType: "bson"}] + }] +} + )"); + + // The second explain request. This verifies that virtual collections are cleaned up after + // the aggregation request is done. + verifyExplainAggCommand(client, explainAggCmdObj); +} +} // namespace +} // namespace mongo diff --git a/src/mongo/db/commands/external_data_source_scope_guard.cpp b/src/mongo/db/commands/external_data_source_scope_guard.cpp new file mode 100644 index 00000000000..f9a29fcba67 --- /dev/null +++ b/src/mongo/db/commands/external_data_source_scope_guard.cpp @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/commands/external_data_source_scope_guard.h" + +#include "mongo/db/catalog/create_collection.h" +#include "mongo/db/catalog/drop_collection.h" +#include "mongo/db/catalog/virtual_collection_options.h" +#include "mongo/db/drop_gen.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/external_data_source_option_gen.h" +#include "mongo/logv2/log.h" +#include "mongo/util/destructor_guard.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +namespace mongo { +ExternalDataSourceScopeGuard::ExternalDataSourceScopeGuard( + OperationContext* opCtx, + const std::vector<std::pair<NamespaceString, std::vector<ExternalDataSourceInfo>>>& + usedExternalDataSources) + : _opCtx(opCtx) { + // Just in case that any virtual collection could not be created, when dtor does not have a + // chance to be executed, cleans up collections that has already been created at that + // moment. + ScopeGuard dropVcollGuard([&] { dropVirtualCollections(); }); + + for (auto&& [extDataSourceNss, dataSources] : usedExternalDataSources) { + VirtualCollectionOptions vopts(dataSources); + uassertStatusOK(createVirtualCollection(opCtx, extDataSourceNss, vopts)); + _toBeDroppedVirtualCollections.emplace_back(extDataSourceNss); + } + + dropVcollGuard.dismiss(); +} + +void ExternalDataSourceScopeGuard::dropVirtualCollections() noexcept { + // The move constructor sets '_opCtx' to null when ownership is moved to the other object which + // means this object must not try to drop collections. There's nothing to drop if '_opCtx' is + // null. + if (!_opCtx) { + return; + } + + // This function is called in a context of destructor or exception and so guard this against any + // exceptions. + DESTRUCTOR_GUARD({ + for (auto&& nss : _toBeDroppedVirtualCollections) { + DropReply reply; + auto status = + dropCollection(_opCtx, + nss, + &reply, + DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops); + if (!status.isOK()) { + LOGV2_ERROR(6968700, "Failed to drop an external data source", "coll"_attr = nss); + } + } + }); +} +} // namespace mongo diff --git a/src/mongo/db/commands/external_data_source_scope_guard.h b/src/mongo/db/commands/external_data_source_scope_guard.h new file mode 100644 index 00000000000..21b48890432 --- /dev/null +++ b/src/mongo/db/commands/external_data_source_scope_guard.h @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/clientcursor.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/external_data_source_option_gen.h" + +namespace mongo { +/** + * This class makes sure that virtual collections that are created for external data sources are + * dropped when it's destroyed. + */ +class ExternalDataSourceScopeGuard { +public: + // Makes ExternalDataSourceScopeGuard a decoration of ClientCursor. + static const ClientCursor::Decoration<std::shared_ptr<ExternalDataSourceScopeGuard>> get; + + // Updates the operation context of decorated ExternalDataSourceScopeGuard object of 'cursor' + // so that it can drop virtual collections in the new 'opCtx'. + static void updateOperationContext(const ClientCursor* cursor, OperationContext* opCtx) { + if (auto self = get(cursor); self) { + get(cursor)->_opCtx = opCtx; + } + } + + ExternalDataSourceScopeGuard() : _opCtx(nullptr), _toBeDroppedVirtualCollections() {} + + ExternalDataSourceScopeGuard( + OperationContext* opCtx, + const std::vector<std::pair<NamespaceString, std::vector<ExternalDataSourceInfo>>>& + usedExternalDataSources); + + // It does not make sense to support copy ctor because this object must drop created virtual + // collections. + ExternalDataSourceScopeGuard(const ExternalDataSourceScopeGuard&) = delete; + + ExternalDataSourceScopeGuard(ExternalDataSourceScopeGuard&& other) noexcept + : _opCtx(other._opCtx), + _toBeDroppedVirtualCollections(std::move(other._toBeDroppedVirtualCollections)) { + // Ownership of created virtual collections are moved to this object and the other object + // must not try to drop them any more. + other._opCtx = nullptr; + } + + ~ExternalDataSourceScopeGuard() { + dropVirtualCollections(); + } + +private: + void dropVirtualCollections() noexcept; + + OperationContext* _opCtx; + std::vector<NamespaceString> _toBeDroppedVirtualCollections; +}; +} // namespace mongo diff --git a/src/mongo/db/commands/fail_point_cmd.cpp b/src/mongo/db/commands/fail_point_cmd.cpp index 7e868735f5b..a067522d6fe 100644 --- a/src/mongo/db/commands/fail_point_cmd.cpp +++ b/src/mongo/db/commands/fail_point_cmd.cpp @@ -93,6 +93,10 @@ public: return Status::OK(); } + bool allowedWithSecurityToken() const final { + return true; + } + std::string help() const override { return "modifies the settings of a fail point"; } diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 1ba9efa0a75..ec70924da40 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -60,6 +60,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/query_analysis_writer.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/resource_consumption_metrics.h" #include "mongo/db/stats/top.h" @@ -76,6 +77,7 @@ namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(failAllFindAndModify); MONGO_FAIL_POINT_DEFINE(hangBeforeFindAndModifyPerformsUpdate); /** @@ -682,6 +684,16 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun( } } + if (analyze_shard_key::supportsPersistingSampledQueries() && request().getSampleId()) { + analyze_shard_key::QueryAnalysisWriter::get(opCtx) + .addFindAndModifyQuery(request()) + .getAsync([](auto) {}); + } + + if (MONGO_unlikely(failAllFindAndModify.shouldFail())) { + uasserted(ErrorCodes::InternalError, "failAllFindAndModify failpoint active!"); + } + const bool inTransaction = opCtx->inMultiDocumentTransaction(); // Although usually the PlanExecutor handles WCE internally, it will throw WCEs when it @@ -711,6 +723,7 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun( if (opCtx->getTxnNumber()) { updateRequest.setStmtIds({stmtId}); } + updateRequest.setSampleId(req.getSampleId()); const ExtensionsCallbackReal extensionsCallback( opCtx, &updateRequest.getNamespaceString()); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index d048a2ec616..1188981c991 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -56,6 +56,7 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/s/query_analysis_writer.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/resource_consumption_metrics.h" @@ -65,6 +66,7 @@ #include "mongo/db/transaction/transaction_participant.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/util/assert_util.h" #include "mongo/util/database_name_util.h" #include "mongo/util/fail_point.h" @@ -362,6 +364,8 @@ public: // execution tree with an EOFStage. const auto& collection = ctx->getCollection(); + cq->setUseCqfIfEligible(true); + // Get the execution plan for the query. bool permitYield = true; auto exec = @@ -433,10 +437,10 @@ public: // The presence of a term in the request indicates that this is an internal replication // oplog read request. if (term && isOplogNss) { - // We do not want to take tickets for internal (replication) oplog reads. Stalling - // on ticket acquisition can cause complicated deadlocks. Primaries may depend on - // data reaching secondaries in order to proceed; and secondaries may get stalled - // replicating because of an inability to acquire a read ticket. + // We do not want to wait to take tickets for internal (replication) oplog reads. + // Stalling on ticket acquisition can cause complicated deadlocks. Primaries may + // depend on data reaching secondaries in order to proceed; and secondaries may get + // stalled replicating because of an inability to acquire a read ticket. opCtx->lockState()->setAdmissionPriority(AdmissionContext::Priority::kImmediate); } @@ -481,6 +485,16 @@ public: .expectedUUID(findCommand->getCollectionUUID())); const auto& nss = ctx->getNss(); + if (analyze_shard_key::supportsPersistingSampledQueries() && + findCommand->getSampleId()) { + analyze_shard_key::QueryAnalysisWriter::get(opCtx) + .addFindQuery(*findCommand->getSampleId(), + nss, + findCommand->getFilter(), + findCommand->getCollation()) + .getAsync([](auto) {}); + } + // Going forward this operation must never ignore interrupt signals while waiting for // lock acquisition. This InterruptibleLockGuard will ensure that waiting for lock // re-acquisition after yielding will not ignore interrupt signals. This is necessary to @@ -542,10 +556,10 @@ public: const auto& findCommand = cq->getFindCommandRequest(); auto viewAggregationCommand = uassertStatusOK(query_request_helper::asAggregationCommand(findCommand)); - - BSONObj aggResult = CommandHelpers::runCommandDirectly( - opCtx, - OpMsgRequest::fromDBAndBody(_dbName.db(), std::move(viewAggregationCommand))); + auto aggRequest = + OpMsgRequestBuilder::create(_dbName, std::move(viewAggregationCommand)); + aggRequest.validatedTenancyScope = _request.validatedTenancyScope; + BSONObj aggResult = CommandHelpers::runCommandDirectly(opCtx, aggRequest); auto status = getStatusFromCommandResult(aggResult); if (status.code() == ErrorCodes::InvalidPipelineOperator) { uasserted(ErrorCodes::InvalidPipelineOperator, @@ -572,6 +586,8 @@ public: opCtx->recoveryUnit()->setReadOnce(true); } + cq->setUseCqfIfEligible(true); + // Get the execution plan for the query. bool permitYield = true; auto exec = diff --git a/src/mongo/db/commands/generic.cpp b/src/mongo/db/commands/generic.cpp index c5f8dde7e74..9bc1717f6e8 100644 --- a/src/mongo/db/commands/generic.cpp +++ b/src/mongo/db/commands/generic.cpp @@ -173,6 +173,10 @@ public: return false; } + bool allowedWithSecurityToken() const final { + return true; + } + bool run(OperationContext* opCtx, const DatabaseName&, const BSONObj& cmdObj, diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 3d6c73db66f..09aa73cc315 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -40,6 +40,7 @@ #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/external_data_source_scope_guard.h" #include "mongo/db/curop.h" #include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/cursor_manager.h" @@ -485,6 +486,10 @@ public: setUpOperationContextStateForGetMore( opCtx, *cursorPin.getCursor(), _cmd, disableAwaitDataFailpointActive); + // Update opCtx of the decorated ExternalDataSourceScopeGuard object so that it can drop + // virtual collections in the new 'opCtx'. + ExternalDataSourceScopeGuard::updateOperationContext(cursorPin.getCursor(), opCtx); + // On early return, typically due to a failed assertion, delete the cursor. ScopeGuard cursorDeleter([&] { cursorPin.deleteUnderlying(); }); @@ -717,10 +722,10 @@ public: // internal clients (see checkAuthForGetMore). curOp->debug().isReplOplogGetMore = true; - // We do not want to take tickets for internal (replication) oplog reads. Stalling - // on ticket acquisition can cause complicated deadlocks. Primaries may depend on - // data reaching secondaries in order to proceed; and secondaries may get stalled - // replicating because of an inability to acquire a read ticket. + // We do not want to wait to take tickets for internal (replication) oplog reads. + // Stalling on ticket acquisition can cause complicated deadlocks. Primaries may + // depend on data reaching secondaries in order to proceed; and secondaries may get + // stalled replicating because of an inability to acquire a read ticket. opCtx->lockState()->setAdmissionPriority(AdmissionContext::Priority::kImmediate); } diff --git a/src/mongo/db/commands/parameters.cpp b/src/mongo/db/commands/parameters.cpp index 0cfb0f6dcf2..0829fcd0e32 100644 --- a/src/mongo/db/commands/parameters.cpp +++ b/src/mongo/db/commands/parameters.cpp @@ -238,6 +238,11 @@ public: h += "{ getParameter:'*' } or { getParameter:{allParameters: true} } to get everything\n"; return h; } + + bool allowedWithSecurityToken() const final { + return true; + } + bool run(OperationContext* opCtx, const DatabaseName& dbName, const BSONObj& cmdObj, diff --git a/src/mongo/db/commands/pipeline_command.cpp b/src/mongo/db/commands/pipeline_command.cpp index b2fb0341568..13037602698 100644 --- a/src/mongo/db/commands/pipeline_command.cpp +++ b/src/mongo/db/commands/pipeline_command.cpp @@ -35,8 +35,8 @@ #include "mongo/db/auth/authorization_checks.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/create_collection.h" -#include "mongo/db/catalog/virtual_collection_options.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/external_data_source_scope_guard.h" #include "mongo/db/commands/run_aggregate.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregate_command_gen.h" @@ -46,7 +46,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/idl/idl_parser.h" -#include "mongo/stdx/unordered_set.h" +#include "mongo/util/assert_util.h" #include "mongo/util/database_name_util.h" namespace mongo { @@ -202,18 +202,20 @@ public: CommandHelpers::handleMarkKillOnClientDisconnect( opCtx, !Pipeline::aggHasWriteStage(_request.body)); - // TODO SERVER-69687 Create a virtual collection per each used external data source. - for (auto&& [collName, dataSources] : _usedExternalDataSources) { - VirtualCollectionOptions vopts(dataSources); - } - + // Create virtual collections and drop them when aggregate command is done. Conceptually + // ownership of virtual collections are moved to runAggregate() function together with + // 'dropVcollGuard' so that it can clean up virtual collections when it's done with + // them. ExternalDataSourceScopeGuard will take care of the situation when any + // collection could not be created. + ExternalDataSourceScopeGuard dropVcollGuard(opCtx, _usedExternalDataSources); uassertStatusOK(runAggregate(opCtx, _aggregationRequest.getNamespace(), _aggregationRequest, _liteParsedPipeline, _request.body, _privileges, - reply)); + reply, + std::move(dropVcollGuard))); // The aggregate command's response is unstable when 'explain' or 'exchange' fields are // set. @@ -231,14 +233,16 @@ public: void explain(OperationContext* opCtx, ExplainOptions::Verbosity verbosity, rpc::ReplyBuilderInterface* result) override { - + // See run() method for details. + ExternalDataSourceScopeGuard dropVcollGuard(opCtx, _usedExternalDataSources); uassertStatusOK(runAggregate(opCtx, _aggregationRequest.getNamespace(), _aggregationRequest, _liteParsedPipeline, _request.body, _privileges, - result)); + result, + std::move(dropVcollGuard))); } void doCheckAuthorization(OperationContext* opCtx) const override { diff --git a/src/mongo/db/commands/profile_common.cpp b/src/mongo/db/commands/profile_common.cpp index 54223b8f5a7..ba546e62067 100644 --- a/src/mongo/db/commands/profile_common.cpp +++ b/src/mongo/db/commands/profile_common.cpp @@ -133,6 +133,7 @@ bool ProfileCmdBase::run(OperationContext* opCtx, newState.append("filter"_sd, newSettings.filter->serialize()); } attrs.add("to", newState.obj()); + attrs.add("db", dbName.db()); LOGV2(48742, "Profiler settings changed", attrs); } diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index eb32a6c220b..cb15cf82247 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -42,6 +42,7 @@ #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" #include "mongo/db/change_stream_serverless_helpers.h" +#include "mongo/db/commands/external_data_source_scope_guard.h" #include "mongo/db/curop.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" @@ -81,6 +82,7 @@ #include "mongo/db/repl/speculative_majority_read_info.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/query_analysis_writer.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/resource_consumption_metrics.h" @@ -659,7 +661,7 @@ Status runAggregate(OperationContext* opCtx, const BSONObj& cmdObj, const PrivilegeVector& privileges, rpc::ReplyBuilderInterface* result) { - return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result); + return runAggregate(opCtx, nss, request, {request}, cmdObj, privileges, result, {}); } Status runAggregate(OperationContext* opCtx, @@ -668,7 +670,8 @@ Status runAggregate(OperationContext* opCtx, const LiteParsedPipeline& liteParsedPipeline, const BSONObj& cmdObj, const PrivilegeVector& privileges, - rpc::ReplyBuilderInterface* result) { + rpc::ReplyBuilderInterface* result, + ExternalDataSourceScopeGuard externalDataSourceGuard) { // Perform some validations on the LiteParsedPipeline and request before continuing with the // aggregation command. @@ -945,6 +948,15 @@ Status runAggregate(OperationContext* opCtx, } } + if (analyze_shard_key::supportsPersistingSampledQueries() && request.getSampleId()) { + analyze_shard_key::QueryAnalysisWriter::get(opCtx) + .addAggregateQuery(*request.getSampleId(), + expCtx->ns, + pipeline->getInitialQuery(), + expCtx->getCollatorBSON()) + .getAsync([](auto) {}); + } + // If the aggregate command supports encrypted collections, do rewrites of the pipeline to // support querying against encrypted fields. if (shouldDoFLERewrite(request)) { @@ -1021,6 +1033,8 @@ Status runAggregate(OperationContext* opCtx, p.deleteUnderlying(); } }); + auto extDataSrcGuard = + std::make_shared<ExternalDataSourceScopeGuard>(std::move(externalDataSourceGuard)); for (auto&& exec : execs) { ClientCursorParams cursorParams( std::move(exec), @@ -1038,6 +1052,10 @@ Status runAggregate(OperationContext* opCtx, pin->incNBatches(); cursors.emplace_back(pin.getCursor()); + // All cursors share the ownership to 'extDataSrcGuard' and if the last cursor is destroyed, + // 'extDataSrcGuard' is also destroyed and created virtual collections are dropped by the + // destructor of ExternalDataSourceScopeGuard. + ExternalDataSourceScopeGuard::get(pin.getCursor()) = extDataSrcGuard; pins.emplace_back(std::move(pin)); } diff --git a/src/mongo/db/commands/run_aggregate.h b/src/mongo/db/commands/run_aggregate.h index 0bb86ac91b0..33c03e80cdb 100644 --- a/src/mongo/db/commands/run_aggregate.h +++ b/src/mongo/db/commands/run_aggregate.h @@ -32,6 +32,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/auth/privilege.h" +#include "mongo/db/commands/external_data_source_scope_guard.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/aggregate_command_gen.h" @@ -57,7 +58,8 @@ Status runAggregate(OperationContext* opCtx, const LiteParsedPipeline& liteParsedPipeline, const BSONObj& cmdObj, const PrivilegeVector& privileges, - rpc::ReplyBuilderInterface* result); + rpc::ReplyBuilderInterface* result, + ExternalDataSourceScopeGuard externalDataSourceGuard); /** * Convenience version that internally constructs the LiteParsedPipeline. diff --git a/src/mongo/db/commands/server_status_command.cpp b/src/mongo/db/commands/server_status_command.cpp index 62433e57e62..521e4a49010 100644 --- a/src/mongo/db/commands/server_status_command.cpp +++ b/src/mongo/db/commands/server_status_command.cpp @@ -78,6 +78,10 @@ public: return Status::OK(); } + bool allowedWithSecurityToken() const final { + return true; + } + bool run(OperationContext* opCtx, const DatabaseName& dbName, const BSONObj& cmdObj, diff --git a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl index 96bfcd775cd..c8f58df0c0c 100644 --- a/src/mongo/db/commands/tenant_migration_recipient_cmds.idl +++ b/src/mongo/db/commands/tenant_migration_recipient_cmds.idl @@ -97,6 +97,7 @@ commands: strict: true namespace: ignored api_version: "" + reply_type: recipientSyncDataResponse inline_chained_structs: true chained_structs: MigrationRecipientCommonData: MigrationRecipientCommonData @@ -144,6 +145,7 @@ commands: recipientForgetMigration: description: "Parser for the 'recipientForgetMigration' command." command_name: recipientForgetMigration + reply_type: OkReply strict: true namespace: ignored api_version: "" |