From 58501015eff7961dc378abe1d49e064a3dcf3dbc Mon Sep 17 00:00:00 2001 From: Henrik Edin Date: Mon, 24 Sep 2018 14:10:22 -0400 Subject: SERVER-37295 Remove embedded dependency on process interface shard server. Refactor process interface system to use shim to allow for separate factories for embedded and mongod. --- src/mongo/SConscript | 1 + src/mongo/db/commands/SConscript | 2 +- src/mongo/db/commands/run_aggregate.cpp | 4 +- src/mongo/db/pipeline/SConscript | 37 +- src/mongo/db/pipeline/mongo_process_interface.cpp | 37 ++ src/mongo/db/pipeline/mongo_process_interface.h | 8 + src/mongo/db/pipeline/mongod_process_interface.cpp | 625 --------------------- src/mongo/db/pipeline/mongod_process_interface.h | 159 ------ .../pipeline/process_interface_factory_mongod.cpp | 43 ++ .../db/pipeline/process_interface_shardsvr.cpp | 245 ++++++++ src/mongo/db/pipeline/process_interface_shardsvr.h | 67 +++ .../db/pipeline/process_interface_standalone.cpp | 527 +++++++++++++++++ .../db/pipeline/process_interface_standalone.h | 132 +++++ src/mongo/embedded/SConscript | 2 + .../process_interface_factory_embedded.cpp | 40 ++ 15 files changed, 1138 insertions(+), 791 deletions(-) create mode 100644 src/mongo/db/pipeline/mongo_process_interface.cpp delete mode 100644 src/mongo/db/pipeline/mongod_process_interface.cpp delete mode 100644 src/mongo/db/pipeline/mongod_process_interface.h create mode 100644 src/mongo/db/pipeline/process_interface_factory_mongod.cpp create mode 100644 src/mongo/db/pipeline/process_interface_shardsvr.cpp create mode 100644 src/mongo/db/pipeline/process_interface_shardsvr.h create mode 100644 src/mongo/db/pipeline/process_interface_standalone.cpp create mode 100644 src/mongo/db/pipeline/process_interface_standalone.h create mode 100644 src/mongo/embedded/process_interface_factory_embedded.cpp (limited to 'src/mongo') diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 7d82797d5e1..593923ce799 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -314,6 +314,7 @@ env.Library( 'db/mongodandmongos', 'db/periodic_runner_job_abort_expired_transactions', 'db/periodic_runner_job_decrease_snapshot_cache_pressure', + 'db/pipeline/process_interface_factory_mongod', 'db/query_exec', 'db/repair_database', 'db/repair_database_and_check_version', diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 984e9ce80a2..ec5b2339ae6 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -243,7 +243,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/curop_failpoint_helpers', '$BUILD_DIR/mongo/db/ops/write_ops_exec', - '$BUILD_DIR/mongo/db/pipeline/mongod_process_interface', + '$BUILD_DIR/mongo/db/pipeline/mongo_process_interface', '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/rw_concern_d', '$BUILD_DIR/mongo/db/stats/counters', diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 7970dcac23e..ff5e0268dd6 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -49,7 +49,7 @@ #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" -#include "mongo/db/pipeline/mongod_process_interface.h" +#include "mongo/db/pipeline/mongo_process_interface.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/query/collation/collator_factory_interface.h" @@ -328,7 +328,7 @@ boost::intrusive_ptr makeExpressionContext( new ExpressionContext(opCtx, request, std::move(collator), - MongoDInterface::create(opCtx), + MongoProcessInterface::create(opCtx), uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)), uuid); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index d32e34b061e..8da8342dbbe 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -263,6 +263,16 @@ env.Library( ] ) +env.Library( + target='mongo_process_interface', + source=[ + 'mongo_process_interface.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ] +) + env.Library( target='mongo_process_common', source=[ @@ -276,15 +286,13 @@ env.Library( ) env.Library( - target='mongod_process_interface', + target='process_interface_standalone', source=[ - 'mongod_process_interface.cpp', + 'process_interface_standalone.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/query_exec', - '$BUILD_DIR/mongo/db/stats/top', - '$BUILD_DIR/mongo/s/sharding_api', 'mongo_process_common', ], LIBDEPS_PRIVATE=[ @@ -292,6 +300,17 @@ env.Library( ], ) +env.Library( + target='process_interface_shardsvr', + source=[ + 'process_interface_shardsvr.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/s/sharding_api', + 'process_interface_standalone', + ], +) + env.Library( target='mongos_process_interface', source=[ @@ -304,6 +323,16 @@ env.Library( ] ) +env.Library( + target="process_interface_factory_mongod", + source=[ + "process_interface_factory_mongod.cpp", + ], + LIBDEPS_PRIVATE=[ + 'process_interface_shardsvr', + ], +) + pipelineeEnv = env.Clone() pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) pipelineeEnv.Library( diff --git a/src/mongo/db/pipeline/mongo_process_interface.cpp b/src/mongo/db/pipeline/mongo_process_interface.cpp new file mode 100644 index 00000000000..547c1760148 --- /dev/null +++ b/src/mongo/db/pipeline/mongo_process_interface.cpp @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo_process_interface.h" + +namespace mongo { + +MONGO_DEFINE_SHIM(MongoProcessInterface::create); + +} // namespace mongo diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 974fd2807d4..664c956c33a 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -35,6 +35,7 @@ #include #include +#include "mongo/base/shim.h" #include "mongo/client/dbclient_base.h" #include "mongo/db/collection_index_usage_tracker.h" #include "mongo/db/generic_cursor.h" @@ -68,6 +69,13 @@ public: enum class CurrentOpSessionsMode { kIncludeIdle, kExcludeIdle }; enum class CurrentOpCursorMode { kIncludeCursors, kExcludeCursors }; + /** + * Factory function to create MongoProcessInterface of the right type. The implementation will + * be installed by a lib higher up in the link graph depending on the application type. + */ + static MONGO_DECLARE_SHIM( + (OperationContext * opCtx)->std::shared_ptr) create; + struct MakePipelineOptions { MakePipelineOptions(){}; diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/mongod_process_interface.cpp deleted file mode 100644 index 0141726f22e..00000000000 --- a/src/mongo/db/pipeline/mongod_process_interface.cpp +++ /dev/null @@ -1,625 +0,0 @@ -/** - * Copyright (C) 2018 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery - -#include "mongo/platform/basic.h" - -#include "mongo/db/pipeline/mongod_process_interface.h" - -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/database_holder.h" -#include "mongo/db/catalog/document_validation.h" -#include "mongo/db/catalog/uuid_catalog.h" -#include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/curop.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/ops/write_ops_exec.h" -#include "mongo/db/ops/write_ops_gen.h" -#include "mongo/db/pipeline/document_source_cursor.h" -#include "mongo/db/pipeline/pipeline_d.h" -#include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/sharding_state.h" -#include "mongo/db/session_catalog.h" -#include "mongo/db/stats/fill_locker_info.h" -#include "mongo/db/stats/storage_stats.h" -#include "mongo/db/storage/backup_cursor_hooks.h" -#include "mongo/db/transaction_participant.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/grid.h" -#include "mongo/s/write_ops/cluster_write.h" -#include "mongo/util/log.h" - -namespace mongo { - -using boost::intrusive_ptr; -using std::shared_ptr; -using std::string; -using std::unique_ptr; -using write_ops::Insert; -using write_ops::Update; -using write_ops::UpdateOpEntry; - -namespace { - -/** - * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. - */ -Insert buildInsertOp(const NamespaceString& nss, - std::vector&& objs, - bool bypassDocValidation) { - Insert insertOp(nss); - insertOp.setDocuments(std::move(objs)); - insertOp.setWriteCommandBase([&] { - write_ops::WriteCommandBase wcb; - wcb.setOrdered(true); - wcb.setBypassDocumentValidation(bypassDocValidation); - return wcb; - }()); - return insertOp; -} - -/** - * Builds an ordered update op on namespace 'nss' with update entries {q: , u: }. - * - * Note that 'queries' and 'updates' must be the same length. - */ -Update buildUpdateOp(const NamespaceString& nss, - std::vector&& queries, - std::vector&& updates, - bool upsert, - bool multi, - bool bypassDocValidation) { - Update updateOp(nss); - updateOp.setUpdates([&] { - std::vector updateEntries; - for (size_t index = 0; index < queries.size(); ++index) { - updateEntries.push_back([&] { - UpdateOpEntry entry; - entry.setQ(std::move(queries[index])); - entry.setU(std::move(updates[index])); - entry.setUpsert(upsert); - entry.setMulti(multi); - return entry; - }()); - } - return updateEntries; - }()); - updateOp.setWriteCommandBase([&] { - write_ops::WriteCommandBase wcb; - wcb.setOrdered(true); - wcb.setBypassDocumentValidation(bypassDocValidation); - return wcb; - }()); - return updateOp; -} - -// Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each -// of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of -// index. -bool keyPatternNamesExactPaths(const BSONObj& keyPattern, - const std::set& uniqueKeyPaths) { - size_t nFieldsMatched = 0; - for (auto&& elem : keyPattern) { - if (!elem.isNumber()) { - return false; - } - if (uniqueKeyPaths.find(elem.fieldNameStringData()) == uniqueKeyPaths.end()) { - return false; - } - ++nFieldsMatched; - } - return nFieldsMatched == uniqueKeyPaths.size(); -} - -bool supportsUniqueKey(const boost::intrusive_ptr& expCtx, - const IndexCatalogEntry* index, - const std::set& uniqueKeyPaths) { - return (index->descriptor()->unique() && !index->descriptor()->isPartial() && - keyPatternNamesExactPaths(index->descriptor()->keyPattern(), uniqueKeyPaths) && - CollatorInterface::collatorsMatch(index->getCollator(), expCtx->getCollator())); -} - -} // namespace - -// static -std::shared_ptr MongoDInterface::create(OperationContext* opCtx) { - return ShardingState::get(opCtx)->enabled() - ? std::make_shared(opCtx) - : std::make_shared(opCtx); -} - -MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {} - -void MongoDInterface::setOperationContext(OperationContext* opCtx) { - _client.setOpCtx(opCtx); -} - -DBClientBase* MongoDInterface::directClient() { - return &_client; -} - -bool MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - AutoGetCollectionForReadCommand autoColl(opCtx, nss); - auto const css = CollectionShardingState::get(opCtx, nss); - return css->getMetadata(opCtx)->isSharded(); -} - -void MongoDInterface::insert(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::vector&& objs) { - auto writeResults = performInserts( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); - - // Only need to check that the final result passed because the inserts are ordered and the batch - // will stop on the first failure. - uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Insert failed: "); -} - -void MongoDInterface::update(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::vector&& queries, - std::vector&& updates, - bool upsert, - bool multi) { - auto writeResults = performUpdates(expCtx->opCtx, - buildUpdateOp(ns, - std::move(queries), - std::move(updates), - upsert, - multi, - expCtx->bypassDocumentValidation)); - - // Only need to check that the final result passed because the updates are ordered and the batch - // will stop on the first failure. - uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Update failed: "); -} - -CollectionIndexUsageMap MongoDInterface::getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) { - AutoGetCollectionForReadCommand autoColl(opCtx, ns); - - Collection* collection = autoColl.getCollection(); - if (!collection) { - LOG(2) << "Collection not found on index stats retrieval: " << ns.ns(); - return CollectionIndexUsageMap(); - } - - return collection->infoCache()->getIndexUsageStats(); -} - -void MongoDInterface::appendLatencyStats(OperationContext* opCtx, - const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const { - Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder); -} - -Status MongoDInterface::appendStorageStats(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const { - return appendCollectionStorageStats(opCtx, nss, param, builder); -} - -Status MongoDInterface::appendRecordCount(OperationContext* opCtx, - const NamespaceString& nss, - BSONObjBuilder* builder) const { - return appendCollectionRecordCount(opCtx, nss, builder); -} - -BSONObj MongoDInterface::getCollectionOptions(const NamespaceString& nss) { - const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); - return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned(); -} - -void MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged( - OperationContext* opCtx, - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list& originalIndexes) { - Lock::GlobalWrite globalLock(opCtx); - - uassert(ErrorCodes::CommandFailed, - str::stream() << "collection options of target collection " << targetNs.ns() - << " changed during processing. Original options: " - << originalCollectionOptions - << ", new options: " - << getCollectionOptions(targetNs), - SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions == - getCollectionOptions(targetNs))); - - auto currentIndexes = _client.getIndexSpecs(targetNs.ns()); - uassert(ErrorCodes::CommandFailed, - str::stream() << "indexes of target collection " << targetNs.ns() - << " changed during processing.", - originalIndexes.size() == currentIndexes.size() && - std::equal(originalIndexes.begin(), - originalIndexes.end(), - currentIndexes.begin(), - SimpleBSONObjComparator::kInstance.makeEqualTo())); - - BSONObj info; - uassert(ErrorCodes::CommandFailed, - str::stream() << "renameCollection failed: " << info, - _client.runCommand("admin", renameCommandObj, info)); -} - -StatusWith> MongoDInterface::makePipeline( - const std::vector& rawPipeline, - const boost::intrusive_ptr& expCtx, - const MakePipelineOptions opts) { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } - - if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); - } - - Status cursorStatus = Status::OK(); - - if (opts.attachCursorSource) { - cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); - } - - return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; -} - -Status MongoDInterface::attachCursorSourceToPipeline( - const boost::intrusive_ptr& expCtx, Pipeline* pipeline) { - invariant(pipeline->getSources().empty() || - !dynamic_cast(pipeline->getSources().front().get())); - - boost::optional autoColl; - if (expCtx->uuid) { - try { - autoColl.emplace(expCtx->opCtx, - NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}); - } catch (const ExceptionFor& ex) { - // The UUID doesn't exist anymore - return ex.toStatus(); - } - } else { - autoColl.emplace(expCtx->opCtx, expCtx->ns); - } - - // makePipeline() is only called to perform secondary aggregation requests and expects the - // collection representing the document source to be not-sharded. We confirm sharding state - // here to avoid taking a collection lock elsewhere for this purpose alone. - // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor - // until after we release the lock, leaving room for a collection to be sharded in-between. - auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns); - uassert(4567, - str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded", - !css->getMetadata(expCtx->opCtx)->isSharded()); - - PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); - - // Optimize again, since there may be additional optimizations that can be done after adding - // the initial cursor stage. - pipeline->optimizePipeline(); - - return Status::OK(); -} - -std::string MongoDInterface::getShardName(OperationContext* opCtx) const { - if (ShardingState::get(opCtx)->enabled()) { - return ShardingState::get(opCtx)->shardId().toString(); - } - - return std::string(); -} - -std::pair, bool> MongoDInterface::collectDocumentKeyFields( - OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const { - if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { - return {{"_id"}, false}; // Nothing is sharded. - } - boost::optional uuid; - NamespaceString nss; - if (nssOrUUID.uuid()) { - uuid = *(nssOrUUID.uuid()); - nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(*uuid); - // An empty namespace indicates that the collection has been dropped. Treat it as unsharded - // and mark the fields as final. - if (nss.isEmpty()) { - return {{"_id"}, true}; - } - } else if (nssOrUUID.nss()) { - nss = *(nssOrUUID.nss()); - } - - // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache - // to determine whether the collection is sharded in the first place. - auto catalogCache = Grid::get(opCtx)->catalogCache(); - - const bool collectionIsSharded = catalogCache && [&]() { - auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss); - return routingInfo.isOK() && routingInfo.getValue().cm(); - }(); - - // Collection exists and is not sharded, mark as not final. - if (!collectionIsSharded) { - return {{"_id"}, false}; - } - - auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata { - AutoGetCollection autoColl(opCtx, nss, MODE_IS); - return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); - }(); - - // If the UUID is set in 'nssOrUuid', check that the UUID in the ScopedCollectionMetadata - // matches. Otherwise, this implies that the collection has been dropped and recreated as - // sharded. - if (!scm->isSharded() || (uuid && !scm->uuidMatches(*uuid))) { - return {{"_id"}, false}; - } - - // Unpack the shard key. - std::vector result; - bool gotId = false; - for (auto& field : scm->getKeyPatternFields()) { - result.emplace_back(field->dottedField()); - gotId |= (result.back().fullPath() == "_id"); - } - if (!gotId) { // If not part of the shard key, "_id" comes last. - result.emplace_back("_id"); - } - // Collection is now sharded so the document key fields will never change, mark as final. - return {result, true}; -} - -std::vector MongoDInterface::getIdleCursors( - const intrusive_ptr& expCtx, CurrentOpUserMode userMode) const { - return CursorManager::getIdleCursors(expCtx->opCtx, userMode); -} - -boost::optional MongoDInterface::lookupSingleDocument( - const boost::intrusive_ptr& expCtx, - const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional readConcern) { - invariant(!readConcern); // We don't currently support a read concern on mongod - it's only - // expected to be necessary on mongos. - - std::unique_ptr pipeline; - try { - // Be sure to do the lookup using the collection default collation - auto foreignExpCtx = expCtx->copyWith( - nss, - collectionUUID, - _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); - pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx)); - } catch (const ExceptionFor&) { - return boost::none; - } - - auto lookedUpDocument = pipeline->getNext(); - if (auto next = pipeline->getNext()) { - uasserted(ErrorCodes::TooManyMatchingDocuments, - str::stream() << "found more than one document with document key " - << documentKey.toString() - << " [" - << lookedUpDocument->toString() - << ", " - << next->toString() - << "]"); - } - return lookedUpDocument; -} - -BackupCursorState MongoDInterface::openBackupCursor(OperationContext* opCtx) { - auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext()); - if (backupCursorHooks->enabled()) { - return backupCursorHooks->openBackupCursor(opCtx); - } else { - uasserted(50956, "Backup cursors are an enterprise only feature."); - } -} - -void MongoDInterface::closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) { - auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext()); - if (backupCursorHooks->enabled()) { - backupCursorHooks->closeBackupCursor(opCtx, cursorId); - } else { - uasserted(50955, "Backup cursors are an enterprise only feature."); - } -} - -std::vector MongoDInterface::getMatchingPlanCacheEntryStats( - OperationContext* opCtx, const NamespaceString& nss, const MatchExpression* matchExp) const { - const auto serializer = [](const PlanCacheEntry& entry) { - BSONObjBuilder out; - Explain::planCacheEntryToBSON(entry, &out); - return out.obj(); - }; - - const auto predicate = [&matchExp](const BSONObj& obj) { - return !matchExp ? true : matchExp->matchesBSON(obj); - }; - - AutoGetCollection autoColl(opCtx, nss, MODE_IS); - const auto collection = autoColl.getCollection(); - uassert( - 50933, str::stream() << "collection '" << nss.toString() << "' does not exist", collection); - - const auto infoCache = collection->infoCache(); - invariant(infoCache); - const auto planCache = infoCache->getPlanCache(); - invariant(planCache); - - return planCache->getMatchingStats(serializer, predicate); -} - -bool MongoDInterface::uniqueKeyIsSupportedByIndex( - const boost::intrusive_ptr& expCtx, - const NamespaceString& nss, - const std::set& uniqueKeyPaths) const { - auto* opCtx = expCtx->opCtx; - // We purposefully avoid a helper like AutoGetCollection here because we don't want to check the - // db version or do anything else. We simply want to protect against concurrent modifications to - // the catalog. - Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); - Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS); - const auto* collection = [&]() -> Collection* { - auto db = DatabaseHolder::getDatabaseHolder().get(opCtx, nss.db()); - return db ? db->getCollection(opCtx, nss) : nullptr; - }(); - if (!collection) { - return uniqueKeyPaths == std::set{"_id"}; - } - - auto indexIterator = collection->getIndexCatalog()->getIndexIterator(opCtx, false); - while (indexIterator.more()) { - IndexDescriptor* descriptor = indexIterator.next(); - if (supportsUniqueKey(expCtx, indexIterator.catalogEntry(descriptor), uniqueKeyPaths)) { - return true; - } - } - return false; -} - -BSONObj MongoDInterface::_reportCurrentOpForClient(OperationContext* opCtx, - Client* client, - CurrentOpTruncateMode truncateOps) const { - BSONObjBuilder builder; - - CurOp::reportCurrentOpForClient( - opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder); - - OperationContext* clientOpCtx = client->getOperationContext(); - - if (clientOpCtx) { - if (auto txnParticipant = TransactionParticipant::get(clientOpCtx)) { - txnParticipant->reportUnstashedState(repl::ReadConcernArgs::get(clientOpCtx), &builder); - } - - // Append lock stats before returning. - if (auto lockerInfo = clientOpCtx->lockState()->getLockerInfo( - CurOp::get(*clientOpCtx)->getLockStatsBase())) { - fillLockerInfo(*lockerInfo, builder); - } - } - - return builder.obj(); -} - -void MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, - CurrentOpUserMode userMode, - std::vector* ops) const { - auto sessionCatalog = SessionCatalog::get(opCtx); - - const bool authEnabled = - AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled(); - - // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to - // create a pattern that will match against all authenticated usernames for the current client. - // If the user is listing ops for all users, we create an empty pattern; constructing an - // instance of SessionKiller::Matcher with this empty pattern will return all sessions. - auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers - ? makeSessionFilterForAuthenticatedUsers(opCtx) - : KillAllSessionsByPatternSet{{}}); - - sessionCatalog->scanSessions( - opCtx, - {std::move(sessionFilter)}, - [&](OperationContext* opCtx, Session* session) { - auto op = - TransactionParticipant::getFromNonCheckedOutSession(session)->reportStashedState(); - if (!op.isEmpty()) { - ops->emplace_back(op); - } - }); -} - -std::unique_ptr MongoDInterface::_getCollectionDefaultCollator( - OperationContext* opCtx, StringData dbName, UUID collectionUUID) { - auto it = _collatorCache.find(collectionUUID); - if (it == _collatorCache.end()) { - auto collator = [&]() -> std::unique_ptr { - AutoGetCollection autoColl(opCtx, {dbName.toString(), collectionUUID}, MODE_IS); - if (!autoColl.getCollection()) { - // This collection doesn't exist, so assume a nullptr default collation - return nullptr; - } else { - auto defaultCollator = autoColl.getCollection()->getDefaultCollator(); - // Clone the collator so that we can safely use the pointer if the collection - // disappears right after we release the lock. - return defaultCollator ? defaultCollator->clone() : nullptr; - } - }(); - - it = _collatorCache.emplace(collectionUUID, std::move(collator)).first; - } - - auto& collator = it->second; - return collator ? collator->clone() : nullptr; -} - -void MongoDInterfaceShardServer::insert(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::vector&& objs) { - BatchedCommandResponse response; - BatchWriteExecStats stats; - - ClusterWriter::write( - expCtx->opCtx, - BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)), - &stats, - &response); - // TODO SERVER-35403: Add more context for which shard produced the error. - uassertStatusOKWithContext(response.toStatus(), "Insert failed: "); -} - -void MongoDInterfaceShardServer::update(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::vector&& queries, - std::vector&& updates, - bool upsert, - bool multi) { - BatchedCommandResponse response; - BatchWriteExecStats stats; - ClusterWriter::write(expCtx->opCtx, - BatchedCommandRequest(buildUpdateOp(ns, - std::move(queries), - std::move(updates), - upsert, - multi, - expCtx->bypassDocumentValidation)), - &stats, - &response); - // TODO SERVER-35403: Add more context for which shard produced the error. - uassertStatusOKWithContext(response.toStatus(), "Update failed: "); -} - -} // namespace mongo diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/mongod_process_interface.h deleted file mode 100644 index cb9614f2172..00000000000 --- a/src/mongo/db/pipeline/mongod_process_interface.h +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Copyright (C) 2018 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/pipeline/mongo_process_common.h" -#include "mongo/db/pipeline/pipeline.h" - -namespace mongo { - -/** - * Class to provide access to mongod-specific implementations of methods required by some - * document sources. - */ -class MongoDInterface : public MongoProcessCommon { -public: - static std::shared_ptr create(OperationContext* opCtx); - - MongoDInterface(OperationContext* opCtx); - - virtual ~MongoDInterface() = default; - - void setOperationContext(OperationContext* opCtx) final; - DBClientBase* directClient() final; - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; - virtual void insert(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::vector&& objs); - virtual void update(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::vector&& queries, - std::vector&& updates, - bool upsert, - bool multi); - CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; - void appendLatencyStats(OperationContext* opCtx, - const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const final; - Status appendStorageStats(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const final; - Status appendRecordCount(OperationContext* opCtx, - const NamespaceString& nss, - BSONObjBuilder* builder) const final; - BSONObj getCollectionOptions(const NamespaceString& nss) final; - void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list& originalIndexes) final; - StatusWith> makePipeline( - const std::vector& rawPipeline, - const boost::intrusive_ptr& expCtx, - const MakePipelineOptions opts = MakePipelineOptions{}) final; - Status attachCursorSourceToPipeline(const boost::intrusive_ptr& expCtx, - Pipeline* pipeline) final; - std::string getShardName(OperationContext* opCtx) const final; - std::pair, bool> collectDocumentKeyFields( - OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const final; - boost::optional lookupSingleDocument( - const boost::intrusive_ptr& expCtx, - const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional readConcern) final; - std::vector getIdleCursors(const boost::intrusive_ptr& expCtx, - CurrentOpUserMode userMode) const final; - BackupCursorState openBackupCursor(OperationContext* opCtx) final; - void closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) final; - - std::vector getMatchingPlanCacheEntryStats(OperationContext*, - const NamespaceString&, - const MatchExpression*) const final; - - bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr& expCtx, - const NamespaceString& nss, - const std::set& uniqueKeyPaths) const final; - -protected: - BSONObj _reportCurrentOpForClient(OperationContext* opCtx, - Client* client, - CurrentOpTruncateMode truncateOps) const final; - - void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, - CurrentOpUserMode userMode, - std::vector* ops) const final; - -private: - /** - * Looks up the collection default collator for the collection given by 'collectionUUID'. A - * collection's default collation is not allowed to change, so we cache the result to allow - * for quick lookups in the future. Looks up the collection by UUID, and returns 'nullptr' - * if the collection does not exist or if the collection's default collation is the simple - * collation. - */ - std::unique_ptr _getCollectionDefaultCollator(OperationContext* opCtx, - StringData dbName, - UUID collectionUUID); - - DBDirectClient _client; - std::map> _collatorCache; -}; - -/** - * Specialized version of the MongoDInterface when this node is a shard server. - */ -class MongoDInterfaceShardServer final : public MongoDInterface { -public: - using MongoDInterface::MongoDInterface; - - /** - * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking, - * routing, stale config handling, etc. - */ - void insert(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::vector&& objs) final; - - /** - * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, - * routing, stale config handling, etc. - */ - void update(const boost::intrusive_ptr& expCtx, - const NamespaceString& ns, - std::vector&& queries, - std::vector&& updates, - bool upsert, - bool multi) final; -}; - -} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_factory_mongod.cpp b/src/mongo/db/pipeline/process_interface_factory_mongod.cpp new file mode 100644 index 00000000000..04e93b15d4d --- /dev/null +++ b/src/mongo/db/pipeline/process_interface_factory_mongod.cpp @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface_shardsvr.h" + +#include "mongo/db/s/sharding_state.h" + +namespace mongo { + +MONGO_REGISTER_SHIM(MongoProcessInterface::create) +(OperationContext* opCtx)->std::shared_ptr { + return ShardingState::get(opCtx)->enabled() ? std::make_shared(opCtx) + : std::make_shared(opCtx); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp new file mode 100644 index 00000000000..e86b7202d2b --- /dev/null +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -0,0 +1,245 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface_shardsvr.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/catalog/uuid_catalog.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/pipeline/document_source_cursor.h" +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/stats/fill_locker_info.h" +#include "mongo/db/stats/storage_stats.h" +#include "mongo/db/storage/backup_cursor_hooks.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/grid.h" +#include "mongo/s/write_ops/cluster_write.h" +#include "mongo/util/log.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using write_ops::Insert; +using write_ops::Update; +using write_ops::UpdateOpEntry; + +namespace { + +/** + * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. + */ +Insert buildInsertOp(const NamespaceString& nss, + std::vector&& objs, + bool bypassDocValidation) { + Insert insertOp(nss); + insertOp.setDocuments(std::move(objs)); + insertOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(true); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return insertOp; +} + +/** + * Builds an ordered update op on namespace 'nss' with update entries {q: , u: }. + * + * Note that 'queries' and 'updates' must be the same length. + */ +Update buildUpdateOp(const NamespaceString& nss, + std::vector&& queries, + std::vector&& updates, + bool upsert, + bool multi, + bool bypassDocValidation) { + Update updateOp(nss); + updateOp.setUpdates([&] { + std::vector updateEntries; + for (size_t index = 0; index < queries.size(); ++index) { + updateEntries.push_back([&] { + UpdateOpEntry entry; + entry.setQ(std::move(queries[index])); + entry.setU(std::move(updates[index])); + entry.setUpsert(upsert); + entry.setMulti(multi); + return entry; + }()); + } + return updateEntries; + }()); + updateOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(true); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return updateOp; +} + +// Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each +// of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of +// index. +bool keyPatternNamesExactPaths(const BSONObj& keyPattern, + const std::set& uniqueKeyPaths) { + size_t nFieldsMatched = 0; + for (auto&& elem : keyPattern) { + if (!elem.isNumber()) { + return false; + } + if (uniqueKeyPaths.find(elem.fieldNameStringData()) == uniqueKeyPaths.end()) { + return false; + } + ++nFieldsMatched; + } + return nFieldsMatched == uniqueKeyPaths.size(); +} + +bool supportsUniqueKey(const boost::intrusive_ptr& expCtx, + const IndexCatalogEntry* index, + const std::set& uniqueKeyPaths) { + return (index->descriptor()->unique() && !index->descriptor()->isPartial() && + keyPatternNamesExactPaths(index->descriptor()->keyPattern(), uniqueKeyPaths) && + CollatorInterface::collatorsMatch(index->getCollator(), expCtx->getCollator())); +} + +} // namespace + +std::pair, bool> MongoInterfaceShardServer::collectDocumentKeyFields( + OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const { + invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + + boost::optional uuid; + NamespaceString nss; + if (nssOrUUID.uuid()) { + uuid = *(nssOrUUID.uuid()); + nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(*uuid); + // An empty namespace indicates that the collection has been dropped. Treat it as unsharded + // and mark the fields as final. + if (nss.isEmpty()) { + return {{"_id"}, true}; + } + } else if (nssOrUUID.nss()) { + nss = *(nssOrUUID.nss()); + } + + // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache + // to determine whether the collection is sharded in the first place. + auto catalogCache = Grid::get(opCtx)->catalogCache(); + + const bool collectionIsSharded = catalogCache && [&]() { + auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss); + return routingInfo.isOK() && routingInfo.getValue().cm(); + }(); + + // Collection exists and is not sharded, mark as not final. + if (!collectionIsSharded) { + return {{"_id"}, false}; + } + + auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata { + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); + }(); + + // If the UUID is set in 'nssOrUuid', check that the UUID in the ScopedCollectionMetadata + // matches. Otherwise, this implies that the collection has been dropped and recreated as + // sharded. + if (!scm->isSharded() || (uuid && !scm->uuidMatches(*uuid))) { + return {{"_id"}, false}; + } + + // Unpack the shard key. + std::vector result; + bool gotId = false; + for (auto& field : scm->getKeyPatternFields()) { + result.emplace_back(field->dottedField()); + gotId |= (result.back().fullPath() == "_id"); + } + if (!gotId) { // If not part of the shard key, "_id" comes last. + result.emplace_back("_id"); + } + // Collection is now sharded so the document key fields will never change, mark as final. + return {result, true}; +} + +void MongoInterfaceShardServer::insert(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::vector&& objs) { + BatchedCommandResponse response; + BatchWriteExecStats stats; + + ClusterWriter::write( + expCtx->opCtx, + BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)), + &stats, + &response); + // TODO SERVER-35403: Add more context for which shard produced the error. + uassertStatusOKWithContext(response.toStatus(), "Insert failed: "); +} + +void MongoInterfaceShardServer::update(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::vector&& queries, + std::vector&& updates, + bool upsert, + bool multi) { + BatchedCommandResponse response; + BatchWriteExecStats stats; + ClusterWriter::write(expCtx->opCtx, + BatchedCommandRequest(buildUpdateOp(ns, + std::move(queries), + std::move(updates), + upsert, + multi, + expCtx->bypassDocumentValidation)), + &stats, + &response); + // TODO SERVER-35403: Add more context for which shard produced the error. + uassertStatusOKWithContext(response.toStatus(), "Update failed: "); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h new file mode 100644 index 00000000000..4685e27735d --- /dev/null +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/process_interface_standalone.h" + +namespace mongo { + +/** + * Specialized version of the MongoDInterface when this node is a shard server. + */ +class MongoInterfaceShardServer final : public MongoInterfaceStandalone { +public: + using MongoInterfaceStandalone::MongoInterfaceStandalone; + + std::pair, bool> collectDocumentKeyFields( + OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const final; + + /** + * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking, + * routing, stale config handling, etc. + */ + void insert(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::vector&& objs) final; + + /** + * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, + * routing, stale config handling, etc. + */ + void update(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::vector&& queries, + std::vector&& updates, + bool upsert, + bool multi) final; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp new file mode 100644 index 00000000000..cb1a2a9a803 --- /dev/null +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -0,0 +1,527 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface_standalone.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/catalog/uuid_catalog.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/pipeline/document_source_cursor.h" +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/stats/fill_locker_info.h" +#include "mongo/db/stats/storage_stats.h" +#include "mongo/db/storage/backup_cursor_hooks.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/grid.h" +#include "mongo/s/write_ops/cluster_write.h" +#include "mongo/util/log.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using write_ops::Insert; +using write_ops::Update; +using write_ops::UpdateOpEntry; + +namespace { + +/** + * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. + */ +Insert buildInsertOp(const NamespaceString& nss, + std::vector&& objs, + bool bypassDocValidation) { + Insert insertOp(nss); + insertOp.setDocuments(std::move(objs)); + insertOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(true); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return insertOp; +} + +/** + * Builds an ordered update op on namespace 'nss' with update entries {q: , u: }. + * + * Note that 'queries' and 'updates' must be the same length. + */ +Update buildUpdateOp(const NamespaceString& nss, + std::vector&& queries, + std::vector&& updates, + bool upsert, + bool multi, + bool bypassDocValidation) { + Update updateOp(nss); + updateOp.setUpdates([&] { + std::vector updateEntries; + for (size_t index = 0; index < queries.size(); ++index) { + updateEntries.push_back([&] { + UpdateOpEntry entry; + entry.setQ(std::move(queries[index])); + entry.setU(std::move(updates[index])); + entry.setUpsert(upsert); + entry.setMulti(multi); + return entry; + }()); + } + return updateEntries; + }()); + updateOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(true); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return updateOp; +} + +// Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each +// of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of +// index. +bool keyPatternNamesExactPaths(const BSONObj& keyPattern, + const std::set& uniqueKeyPaths) { + size_t nFieldsMatched = 0; + for (auto&& elem : keyPattern) { + if (!elem.isNumber()) { + return false; + } + if (uniqueKeyPaths.find(elem.fieldNameStringData()) == uniqueKeyPaths.end()) { + return false; + } + ++nFieldsMatched; + } + return nFieldsMatched == uniqueKeyPaths.size(); +} + +bool supportsUniqueKey(const boost::intrusive_ptr& expCtx, + const IndexCatalogEntry* index, + const std::set& uniqueKeyPaths) { + return (index->descriptor()->unique() && !index->descriptor()->isPartial() && + keyPatternNamesExactPaths(index->descriptor()->keyPattern(), uniqueKeyPaths) && + CollatorInterface::collatorsMatch(index->getCollator(), expCtx->getCollator())); +} + +} // namespace + +MongoInterfaceStandalone::MongoInterfaceStandalone(OperationContext* opCtx) : _client(opCtx) {} + +void MongoInterfaceStandalone::setOperationContext(OperationContext* opCtx) { + _client.setOpCtx(opCtx); +} + +DBClientBase* MongoInterfaceStandalone::directClient() { + return &_client; +} + +bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const NamespaceString& nss) { + AutoGetCollectionForReadCommand autoColl(opCtx, nss); + auto const css = CollectionShardingState::get(opCtx, nss); + return css->getMetadata(opCtx)->isSharded(); +} + +void MongoInterfaceStandalone::insert(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::vector&& objs) { + auto writeResults = performInserts( + expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + + // Only need to check that the final result passed because the inserts are ordered and the batch + // will stop on the first failure. + uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Insert failed: "); +} + +void MongoInterfaceStandalone::update(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::vector&& queries, + std::vector&& updates, + bool upsert, + bool multi) { + auto writeResults = performUpdates(expCtx->opCtx, + buildUpdateOp(ns, + std::move(queries), + std::move(updates), + upsert, + multi, + expCtx->bypassDocumentValidation)); + + // Only need to check that the final result passed because the updates are ordered and the batch + // will stop on the first failure. + uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Update failed: "); +} + +CollectionIndexUsageMap MongoInterfaceStandalone::getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) { + AutoGetCollectionForReadCommand autoColl(opCtx, ns); + + Collection* collection = autoColl.getCollection(); + if (!collection) { + LOG(2) << "Collection not found on index stats retrieval: " << ns.ns(); + return CollectionIndexUsageMap(); + } + + return collection->infoCache()->getIndexUsageStats(); +} + +void MongoInterfaceStandalone::appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const { + Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder); +} + +Status MongoInterfaceStandalone::appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const { + return appendCollectionStorageStats(opCtx, nss, param, builder); +} + +Status MongoInterfaceStandalone::appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const { + return appendCollectionRecordCount(opCtx, nss, builder); +} + +BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) { + const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); + return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned(); +} + +void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list& originalIndexes) { + Lock::GlobalWrite globalLock(opCtx); + + uassert(ErrorCodes::CommandFailed, + str::stream() << "collection options of target collection " << targetNs.ns() + << " changed during processing. Original options: " + << originalCollectionOptions + << ", new options: " + << getCollectionOptions(targetNs), + SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions == + getCollectionOptions(targetNs))); + + auto currentIndexes = _client.getIndexSpecs(targetNs.ns()); + uassert(ErrorCodes::CommandFailed, + str::stream() << "indexes of target collection " << targetNs.ns() + << " changed during processing.", + originalIndexes.size() == currentIndexes.size() && + std::equal(originalIndexes.begin(), + originalIndexes.end(), + currentIndexes.begin(), + SimpleBSONObjComparator::kInstance.makeEqualTo())); + + BSONObj info; + uassert(ErrorCodes::CommandFailed, + str::stream() << "renameCollection failed: " << info, + _client.runCommand("admin", renameCommandObj, info)); +} + +StatusWith> MongoInterfaceStandalone::makePipeline( + const std::vector& rawPipeline, + const boost::intrusive_ptr& expCtx, + const MakePipelineOptions opts) { + auto pipeline = Pipeline::parse(rawPipeline, expCtx); + if (!pipeline.isOK()) { + return pipeline.getStatus(); + } + + if (opts.optimize) { + pipeline.getValue()->optimizePipeline(); + } + + Status cursorStatus = Status::OK(); + + if (opts.attachCursorSource) { + cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); + } + + return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; +} + +Status MongoInterfaceStandalone::attachCursorSourceToPipeline( + const boost::intrusive_ptr& expCtx, Pipeline* pipeline) { + invariant(pipeline->getSources().empty() || + !dynamic_cast(pipeline->getSources().front().get())); + + boost::optional autoColl; + if (expCtx->uuid) { + try { + autoColl.emplace(expCtx->opCtx, + NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}); + } catch (const ExceptionFor& ex) { + // The UUID doesn't exist anymore + return ex.toStatus(); + } + } else { + autoColl.emplace(expCtx->opCtx, expCtx->ns); + } + + // makePipeline() is only called to perform secondary aggregation requests and expects the + // collection representing the document source to be not-sharded. We confirm sharding state + // here to avoid taking a collection lock elsewhere for this purpose alone. + // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor + // until after we release the lock, leaving room for a collection to be sharded in-between. + auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns); + uassert(4567, + str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded", + !css->getMetadata(expCtx->opCtx)->isSharded()); + + PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); + + // Optimize again, since there may be additional optimizations that can be done after adding + // the initial cursor stage. + pipeline->optimizePipeline(); + + return Status::OK(); +} + +std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) const { + if (ShardingState::get(opCtx)->enabled()) { + return ShardingState::get(opCtx)->shardId().toString(); + } + + return std::string(); +} + +std::pair, bool> MongoInterfaceStandalone::collectDocumentKeyFields( + OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const { + return {{"_id"}, false}; // Nothing is sharded. +} + +std::vector MongoInterfaceStandalone::getIdleCursors( + const intrusive_ptr& expCtx, CurrentOpUserMode userMode) const { + return CursorManager::getIdleCursors(expCtx->opCtx, userMode); +} + +boost::optional MongoInterfaceStandalone::lookupSingleDocument( + const boost::intrusive_ptr& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional readConcern) { + invariant(!readConcern); // We don't currently support a read concern on mongod - it's only + // expected to be necessary on mongos. + + std::unique_ptr pipeline; + try { + // Be sure to do the lookup using the collection default collation + auto foreignExpCtx = expCtx->copyWith( + nss, + collectionUUID, + _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); + pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx)); + } catch (const ExceptionFor&) { + return boost::none; + } + + auto lookedUpDocument = pipeline->getNext(); + if (auto next = pipeline->getNext()) { + uasserted(ErrorCodes::TooManyMatchingDocuments, + str::stream() << "found more than one document with document key " + << documentKey.toString() + << " [" + << lookedUpDocument->toString() + << ", " + << next->toString() + << "]"); + } + return lookedUpDocument; +} + +BackupCursorState MongoInterfaceStandalone::openBackupCursor(OperationContext* opCtx) { + auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext()); + if (backupCursorHooks->enabled()) { + return backupCursorHooks->openBackupCursor(opCtx); + } else { + uasserted(50956, "Backup cursors are an enterprise only feature."); + } +} + +void MongoInterfaceStandalone::closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) { + auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext()); + if (backupCursorHooks->enabled()) { + backupCursorHooks->closeBackupCursor(opCtx, cursorId); + } else { + uasserted(50955, "Backup cursors are an enterprise only feature."); + } +} + +std::vector MongoInterfaceStandalone::getMatchingPlanCacheEntryStats( + OperationContext* opCtx, const NamespaceString& nss, const MatchExpression* matchExp) const { + const auto serializer = [](const PlanCacheEntry& entry) { + BSONObjBuilder out; + Explain::planCacheEntryToBSON(entry, &out); + return out.obj(); + }; + + const auto predicate = [&matchExp](const BSONObj& obj) { + return !matchExp ? true : matchExp->matchesBSON(obj); + }; + + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + const auto collection = autoColl.getCollection(); + uassert( + 50933, str::stream() << "collection '" << nss.toString() << "' does not exist", collection); + + const auto infoCache = collection->infoCache(); + invariant(infoCache); + const auto planCache = infoCache->getPlanCache(); + invariant(planCache); + + return planCache->getMatchingStats(serializer, predicate); +} + +bool MongoInterfaceStandalone::uniqueKeyIsSupportedByIndex( + const boost::intrusive_ptr& expCtx, + const NamespaceString& nss, + const std::set& uniqueKeyPaths) const { + auto* opCtx = expCtx->opCtx; + // We purposefully avoid a helper like AutoGetCollection here because we don't want to check the + // db version or do anything else. We simply want to protect against concurrent modifications to + // the catalog. + Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); + Lock::CollectionLock collLock(opCtx->lockState(), nss.ns(), MODE_IS); + const auto* collection = [&]() -> Collection* { + auto db = DatabaseHolder::getDatabaseHolder().get(opCtx, nss.db()); + return db ? db->getCollection(opCtx, nss) : nullptr; + }(); + if (!collection) { + return uniqueKeyPaths == std::set{"_id"}; + } + + auto indexIterator = collection->getIndexCatalog()->getIndexIterator(opCtx, false); + while (indexIterator.more()) { + IndexDescriptor* descriptor = indexIterator.next(); + if (supportsUniqueKey(expCtx, indexIterator.catalogEntry(descriptor), uniqueKeyPaths)) { + return true; + } + } + return false; +} + +BSONObj MongoInterfaceStandalone::_reportCurrentOpForClient( + OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const { + BSONObjBuilder builder; + + CurOp::reportCurrentOpForClient( + opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder); + + OperationContext* clientOpCtx = client->getOperationContext(); + + if (clientOpCtx) { + if (auto txnParticipant = TransactionParticipant::get(clientOpCtx)) { + txnParticipant->reportUnstashedState(repl::ReadConcernArgs::get(clientOpCtx), &builder); + } + + // Append lock stats before returning. + if (auto lockerInfo = clientOpCtx->lockState()->getLockerInfo( + CurOp::get(*clientOpCtx)->getLockStatsBase())) { + fillLockerInfo(*lockerInfo, builder); + } + } + + return builder.obj(); +} + +void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, + CurrentOpUserMode userMode, + std::vector* ops) const { + auto sessionCatalog = SessionCatalog::get(opCtx); + + const bool authEnabled = + AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled(); + + // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to + // create a pattern that will match against all authenticated usernames for the current client. + // If the user is listing ops for all users, we create an empty pattern; constructing an + // instance of SessionKiller::Matcher with this empty pattern will return all sessions. + auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers + ? makeSessionFilterForAuthenticatedUsers(opCtx) + : KillAllSessionsByPatternSet{{}}); + + sessionCatalog->scanSessions( + opCtx, + {std::move(sessionFilter)}, + [&](OperationContext* opCtx, Session* session) { + auto op = + TransactionParticipant::getFromNonCheckedOutSession(session)->reportStashedState(); + if (!op.isEmpty()) { + ops->emplace_back(op); + } + }); +} + +std::unique_ptr MongoInterfaceStandalone::_getCollectionDefaultCollator( + OperationContext* opCtx, StringData dbName, UUID collectionUUID) { + auto it = _collatorCache.find(collectionUUID); + if (it == _collatorCache.end()) { + auto collator = [&]() -> std::unique_ptr { + AutoGetCollection autoColl(opCtx, {dbName.toString(), collectionUUID}, MODE_IS); + if (!autoColl.getCollection()) { + // This collection doesn't exist, so assume a nullptr default collation + return nullptr; + } else { + auto defaultCollator = autoColl.getCollection()->getDefaultCollator(); + // Clone the collator so that we can safely use the pointer if the collection + // disappears right after we release the lock. + return defaultCollator ? defaultCollator->clone() : nullptr; + } + }(); + + it = _collatorCache.emplace(collectionUUID, std::move(collator)).first; + } + + auto& collator = it->second; + return collator ? collator->clone() : nullptr; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h new file mode 100644 index 00000000000..981ca2b35c3 --- /dev/null +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/pipeline/mongo_process_common.h" +#include "mongo/db/pipeline/pipeline.h" + +namespace mongo { + +/** + * Class to provide access to mongod-specific implementations of methods required by some + * document sources. + */ +class MongoInterfaceStandalone : public MongoProcessCommon { +public: + // static std::shared_ptr create(OperationContext* opCtx); + + MongoInterfaceStandalone(OperationContext* opCtx); + + virtual ~MongoInterfaceStandalone() = default; + + void setOperationContext(OperationContext* opCtx) final; + DBClientBase* directClient() final; + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; + void insert(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::vector&& objs) override; + void update(const boost::intrusive_ptr& expCtx, + const NamespaceString& ns, + std::vector&& queries, + std::vector&& updates, + bool upsert, + bool multi) override; + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; + void appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const final; + Status appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const final; + Status appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const final; + BSONObj getCollectionOptions(const NamespaceString& nss) final; + void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list& originalIndexes) final; + StatusWith> makePipeline( + const std::vector& rawPipeline, + const boost::intrusive_ptr& expCtx, + const MakePipelineOptions opts = MakePipelineOptions{}) final; + Status attachCursorSourceToPipeline(const boost::intrusive_ptr& expCtx, + Pipeline* pipeline) final; + std::string getShardName(OperationContext* opCtx) const final; + std::pair, bool> collectDocumentKeyFields( + OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const override; + boost::optional lookupSingleDocument( + const boost::intrusive_ptr& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional readConcern) final; + std::vector getIdleCursors(const boost::intrusive_ptr& expCtx, + CurrentOpUserMode userMode) const final; + BackupCursorState openBackupCursor(OperationContext* opCtx) final; + void closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) final; + + std::vector getMatchingPlanCacheEntryStats(OperationContext*, + const NamespaceString&, + const MatchExpression*) const final; + + bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr& expCtx, + const NamespaceString& nss, + const std::set& uniqueKeyPaths) const final; + +protected: + BSONObj _reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps) const final; + + void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, + CurrentOpUserMode userMode, + std::vector* ops) const final; + +private: + /** + * Looks up the collection default collator for the collection given by 'collectionUUID'. A + * collection's default collation is not allowed to change, so we cache the result to allow + * for quick lookups in the future. Looks up the collection by UUID, and returns 'nullptr' + * if the collection does not exist or if the collection's default collation is the simple + * collation. + */ + std::unique_ptr _getCollectionDefaultCollator(OperationContext* opCtx, + StringData dbName, + UUID collectionUUID); + + DBDirectClient _client; + std::map> _collatorCache; +}; + +} // namespace mongo diff --git a/src/mongo/embedded/SConscript b/src/mongo/embedded/SConscript index 26bd7f7f80a..5b54fadee8d 100644 --- a/src/mongo/embedded/SConscript +++ b/src/mongo/embedded/SConscript @@ -66,6 +66,7 @@ env.Library( 'embedded_options_parser_init.cpp', 'logical_session_cache_factory_embedded.cpp', 'periodic_runner_embedded.cpp', + 'process_interface_factory_embedded.cpp', 'replication_coordinator_embedded.cpp', 'service_entry_point_embedded.cpp', ], @@ -84,6 +85,7 @@ env.Library( '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/op_observer_impl', + '$BUILD_DIR/mongo/db/pipeline/process_interface_standalone', '$BUILD_DIR/mongo/db/repair_database_and_check_version', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/repl/replica_set_messages', diff --git a/src/mongo/embedded/process_interface_factory_embedded.cpp b/src/mongo/embedded/process_interface_factory_embedded.cpp new file mode 100644 index 00000000000..1a59855ccfe --- /dev/null +++ b/src/mongo/embedded/process_interface_factory_embedded.cpp @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface_standalone.h" + +namespace mongo { + +MONGO_REGISTER_SHIM(MongoProcessInterface::create) +(OperationContext* opCtx)->std::shared_ptr { + return std::make_shared(opCtx); +} + +} // namespace mongo -- cgit v1.2.1