diff options
author | Henrik Edin <henrik.edin@mongodb.com> | 2018-09-24 14:10:22 -0400 |
---|---|---|
committer | Henrik Edin <henrik.edin@mongodb.com> | 2018-10-03 14:46:14 -0400 |
commit | 58501015eff7961dc378abe1d49e064a3dcf3dbc (patch) | |
tree | e0b41dd135ec7657605ca87e99b9ef2d3fa3baed /src/mongo | |
parent | 860b392d9d3c006090a4c7fc3c6f3fa5460e5c5c (diff) | |
download | mongo-58501015eff7961dc378abe1d49e064a3dcf3dbc.tar.gz |
SERVER-37295 Remove embedded dependency on process interface shard server.
Refactor process interface system to use shim to allow for separate factories for embedded and mongod.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 37 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongo_process_interface.cpp | 37 | ||||
-rw-r--r-- | src/mongo/db/pipeline/mongo_process_interface.h | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_factory_mongod.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_shardsvr.cpp | 245 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_shardsvr.h | 67 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.cpp (renamed from src/mongo/db/pipeline/mongod_process_interface.cpp) | 190 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.h (renamed from src/mongo/db/pipeline/mongod_process_interface.h) | 55 | ||||
-rw-r--r-- | src/mongo/embedded/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/embedded/process_interface_factory_embedded.cpp | 40 |
13 files changed, 539 insertions, 192 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 7d82797d5e1..593923ce799 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -314,6 +314,7 @@ env.Library( 'db/mongodandmongos', 'db/periodic_runner_job_abort_expired_transactions', 'db/periodic_runner_job_decrease_snapshot_cache_pressure', + 'db/pipeline/process_interface_factory_mongod', 'db/query_exec', 'db/repair_database', 'db/repair_database_and_check_version', diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 984e9ce80a2..ec5b2339ae6 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -243,7 +243,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/curop_failpoint_helpers', '$BUILD_DIR/mongo/db/ops/write_ops_exec', - '$BUILD_DIR/mongo/db/pipeline/mongod_process_interface', + '$BUILD_DIR/mongo/db/pipeline/mongo_process_interface', '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/rw_concern_d', '$BUILD_DIR/mongo/db/stats/counters', diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 7970dcac23e..ff5e0268dd6 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -49,7 +49,7 @@ #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" -#include "mongo/db/pipeline/mongod_process_interface.h" +#include "mongo/db/pipeline/mongo_process_interface.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/query/collation/collator_factory_interface.h" @@ -328,7 +328,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext( new ExpressionContext(opCtx, request, std::move(collator), - MongoDInterface::create(opCtx), + MongoProcessInterface::create(opCtx), uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)), uuid); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index d32e34b061e..8da8342dbbe 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -264,6 +264,16 @@ env.Library( ) env.Library( + target='mongo_process_interface', + source=[ + 'mongo_process_interface.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ] +) + +env.Library( target='mongo_process_common', source=[ 'mongo_process_common.cpp', @@ -276,15 +286,13 @@ env.Library( ) env.Library( - target='mongod_process_interface', + target='process_interface_standalone', source=[ - 'mongod_process_interface.cpp', + 'process_interface_standalone.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/query_exec', - '$BUILD_DIR/mongo/db/stats/top', - '$BUILD_DIR/mongo/s/sharding_api', 'mongo_process_common', ], LIBDEPS_PRIVATE=[ @@ -293,6 +301,17 @@ env.Library( ) env.Library( + target='process_interface_shardsvr', + source=[ + 'process_interface_shardsvr.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/s/sharding_api', + 'process_interface_standalone', + ], +) + +env.Library( target='mongos_process_interface', source=[ 'mongos_process_interface.cpp', @@ -304,6 +323,16 @@ env.Library( ] ) +env.Library( + target="process_interface_factory_mongod", + source=[ + "process_interface_factory_mongod.cpp", + ], + LIBDEPS_PRIVATE=[ + 'process_interface_shardsvr', + ], +) + pipelineeEnv = env.Clone() pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) pipelineeEnv.Library( diff --git a/src/mongo/db/pipeline/mongo_process_interface.cpp b/src/mongo/db/pipeline/mongo_process_interface.cpp new file mode 100644 index 00000000000..547c1760148 --- /dev/null +++ b/src/mongo/db/pipeline/mongo_process_interface.cpp @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo_process_interface.h" + +namespace mongo { + +MONGO_DEFINE_SHIM(MongoProcessInterface::create); + +} // namespace mongo diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 974fd2807d4..664c956c33a 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -35,6 +35,7 @@ #include <string> #include <vector> +#include "mongo/base/shim.h" #include "mongo/client/dbclient_base.h" #include "mongo/db/collection_index_usage_tracker.h" #include "mongo/db/generic_cursor.h" @@ -68,6 +69,13 @@ public: enum class CurrentOpSessionsMode { kIncludeIdle, kExcludeIdle }; enum class CurrentOpCursorMode { kIncludeCursors, kExcludeCursors }; + /** + * Factory function to create MongoProcessInterface of the right type. The implementation will + * be installed by a lib higher up in the link graph depending on the application type. + */ + static MONGO_DECLARE_SHIM( + (OperationContext * opCtx)->std::shared_ptr<MongoProcessInterface>) create; + struct MakePipelineOptions { MakePipelineOptions(){}; diff --git a/src/mongo/db/pipeline/process_interface_factory_mongod.cpp b/src/mongo/db/pipeline/process_interface_factory_mongod.cpp new file mode 100644 index 00000000000..04e93b15d4d --- /dev/null +++ b/src/mongo/db/pipeline/process_interface_factory_mongod.cpp @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface_shardsvr.h" + +#include "mongo/db/s/sharding_state.h" + +namespace mongo { + +MONGO_REGISTER_SHIM(MongoProcessInterface::create) +(OperationContext* opCtx)->std::shared_ptr<MongoProcessInterface> { + return ShardingState::get(opCtx)->enabled() ? std::make_shared<MongoInterfaceShardServer>(opCtx) + : std::make_shared<MongoInterfaceStandalone>(opCtx); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp new file mode 100644 index 00000000000..e86b7202d2b --- /dev/null +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -0,0 +1,245 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface_shardsvr.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/catalog/uuid_catalog.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/pipeline/document_source_cursor.h" +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/stats/fill_locker_info.h" +#include "mongo/db/stats/storage_stats.h" +#include "mongo/db/storage/backup_cursor_hooks.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/grid.h" +#include "mongo/s/write_ops/cluster_write.h" +#include "mongo/util/log.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using write_ops::Insert; +using write_ops::Update; +using write_ops::UpdateOpEntry; + +namespace { + +/** + * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. + */ +Insert buildInsertOp(const NamespaceString& nss, + std::vector<BSONObj>&& objs, + bool bypassDocValidation) { + Insert insertOp(nss); + insertOp.setDocuments(std::move(objs)); + insertOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(true); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return insertOp; +} + +/** + * Builds an ordered update op on namespace 'nss' with update entries {q: <queries>, u: <updates>}. + * + * Note that 'queries' and 'updates' must be the same length. + */ +Update buildUpdateOp(const NamespaceString& nss, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, + bool upsert, + bool multi, + bool bypassDocValidation) { + Update updateOp(nss); + updateOp.setUpdates([&] { + std::vector<UpdateOpEntry> updateEntries; + for (size_t index = 0; index < queries.size(); ++index) { + updateEntries.push_back([&] { + UpdateOpEntry entry; + entry.setQ(std::move(queries[index])); + entry.setU(std::move(updates[index])); + entry.setUpsert(upsert); + entry.setMulti(multi); + return entry; + }()); + } + return updateEntries; + }()); + updateOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(true); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return updateOp; +} + +// Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each +// of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of +// index. +bool keyPatternNamesExactPaths(const BSONObj& keyPattern, + const std::set<FieldPath>& uniqueKeyPaths) { + size_t nFieldsMatched = 0; + for (auto&& elem : keyPattern) { + if (!elem.isNumber()) { + return false; + } + if (uniqueKeyPaths.find(elem.fieldNameStringData()) == uniqueKeyPaths.end()) { + return false; + } + ++nFieldsMatched; + } + return nFieldsMatched == uniqueKeyPaths.size(); +} + +bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const IndexCatalogEntry* index, + const std::set<FieldPath>& uniqueKeyPaths) { + return (index->descriptor()->unique() && !index->descriptor()->isPartial() && + keyPatternNamesExactPaths(index->descriptor()->keyPattern(), uniqueKeyPaths) && + CollatorInterface::collatorsMatch(index->getCollator(), expCtx->getCollator())); +} + +} // namespace + +std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocumentKeyFields( + OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const { + invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); + + boost::optional<UUID> uuid; + NamespaceString nss; + if (nssOrUUID.uuid()) { + uuid = *(nssOrUUID.uuid()); + nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(*uuid); + // An empty namespace indicates that the collection has been dropped. Treat it as unsharded + // and mark the fields as final. + if (nss.isEmpty()) { + return {{"_id"}, true}; + } + } else if (nssOrUUID.nss()) { + nss = *(nssOrUUID.nss()); + } + + // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache + // to determine whether the collection is sharded in the first place. + auto catalogCache = Grid::get(opCtx)->catalogCache(); + + const bool collectionIsSharded = catalogCache && [&]() { + auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss); + return routingInfo.isOK() && routingInfo.getValue().cm(); + }(); + + // Collection exists and is not sharded, mark as not final. + if (!collectionIsSharded) { + return {{"_id"}, false}; + } + + auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata { + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); + }(); + + // If the UUID is set in 'nssOrUuid', check that the UUID in the ScopedCollectionMetadata + // matches. Otherwise, this implies that the collection has been dropped and recreated as + // sharded. + if (!scm->isSharded() || (uuid && !scm->uuidMatches(*uuid))) { + return {{"_id"}, false}; + } + + // Unpack the shard key. + std::vector<FieldPath> result; + bool gotId = false; + for (auto& field : scm->getKeyPatternFields()) { + result.emplace_back(field->dottedField()); + gotId |= (result.back().fullPath() == "_id"); + } + if (!gotId) { // If not part of the shard key, "_id" comes last. + result.emplace_back("_id"); + } + // Collection is now sharded so the document key fields will never change, mark as final. + return {result, true}; +} + +void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs) { + BatchedCommandResponse response; + BatchWriteExecStats stats; + + ClusterWriter::write( + expCtx->opCtx, + BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)), + &stats, + &response); + // TODO SERVER-35403: Add more context for which shard produced the error. + uassertStatusOKWithContext(response.toStatus(), "Insert failed: "); +} + +void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, + bool upsert, + bool multi) { + BatchedCommandResponse response; + BatchWriteExecStats stats; + ClusterWriter::write(expCtx->opCtx, + BatchedCommandRequest(buildUpdateOp(ns, + std::move(queries), + std::move(updates), + upsert, + multi, + expCtx->bypassDocumentValidation)), + &stats, + &response); + // TODO SERVER-35403: Add more context for which shard produced the error. + uassertStatusOKWithContext(response.toStatus(), "Update failed: "); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h new file mode 100644 index 00000000000..4685e27735d --- /dev/null +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/process_interface_standalone.h" + +namespace mongo { + +/** + * Specialized version of the MongoDInterface when this node is a shard server. + */ +class MongoInterfaceShardServer final : public MongoInterfaceStandalone { +public: + using MongoInterfaceStandalone::MongoInterfaceStandalone; + + std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields( + OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const final; + + /** + * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking, + * routing, stale config handling, etc. + */ + void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs) final; + + /** + * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, + * routing, stale config handling, etc. + */ + void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, + bool upsert, + bool multi) final; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 0141726f22e..cb1a2a9a803 100644 --- a/src/mongo/db/pipeline/mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/mongod_process_interface.h" +#include "mongo/db/pipeline/process_interface_standalone.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" @@ -148,32 +148,25 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -// static -std::shared_ptr<MongoProcessInterface> MongoDInterface::create(OperationContext* opCtx) { - return ShardingState::get(opCtx)->enabled() - ? std::make_shared<MongoDInterfaceShardServer>(opCtx) - : std::make_shared<MongoDInterface>(opCtx); -} - -MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {} +MongoInterfaceStandalone::MongoInterfaceStandalone(OperationContext* opCtx) : _client(opCtx) {} -void MongoDInterface::setOperationContext(OperationContext* opCtx) { +void MongoInterfaceStandalone::setOperationContext(OperationContext* opCtx) { _client.setOpCtx(opCtx); } -DBClientBase* MongoDInterface::directClient() { +DBClientBase* MongoInterfaceStandalone::directClient() { return &_client; } -bool MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { +bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const NamespaceString& nss) { AutoGetCollectionForReadCommand autoColl(opCtx, nss); auto const css = CollectionShardingState::get(opCtx, nss); return css->getMetadata(opCtx)->isSharded(); } -void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs) { +void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs) { auto writeResults = performInserts( expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); @@ -182,12 +175,12 @@ void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expC uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Insert failed: "); } -void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, - bool upsert, - bool multi) { +void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, + bool upsert, + bool multi) { auto writeResults = performUpdates(expCtx->opCtx, buildUpdateOp(ns, std::move(queries), @@ -201,8 +194,8 @@ void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expC uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Update failed: "); } -CollectionIndexUsageMap MongoDInterface::getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) { +CollectionIndexUsageMap MongoInterfaceStandalone::getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) { AutoGetCollectionForReadCommand autoColl(opCtx, ns); Collection* collection = autoColl.getCollection(); @@ -214,32 +207,32 @@ CollectionIndexUsageMap MongoDInterface::getIndexStats(OperationContext* opCtx, return collection->infoCache()->getIndexUsageStats(); } -void MongoDInterface::appendLatencyStats(OperationContext* opCtx, - const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const { +void MongoInterfaceStandalone::appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const { Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder); } -Status MongoDInterface::appendStorageStats(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const { +Status MongoInterfaceStandalone::appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const { return appendCollectionStorageStats(opCtx, nss, param, builder); } -Status MongoDInterface::appendRecordCount(OperationContext* opCtx, - const NamespaceString& nss, - BSONObjBuilder* builder) const { +Status MongoInterfaceStandalone::appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const { return appendCollectionRecordCount(opCtx, nss, builder); } -BSONObj MongoDInterface::getCollectionOptions(const NamespaceString& nss) { +BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) { const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned(); } -void MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged( +void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged( OperationContext* opCtx, const BSONObj& renameCommandObj, const NamespaceString& targetNs, @@ -272,7 +265,7 @@ void MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged( _client.runCommand("admin", renameCommandObj, info)); } -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoDInterface::makePipeline( +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoInterfaceStandalone::makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) { @@ -294,7 +287,7 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoDInterface::makePipe return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; } -Status MongoDInterface::attachCursorSourceToPipeline( +Status MongoInterfaceStandalone::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { invariant(pipeline->getSources().empty() || !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); @@ -331,7 +324,7 @@ Status MongoDInterface::attachCursorSourceToPipeline( return Status::OK(); } -std::string MongoDInterface::getShardName(OperationContext* opCtx) const { +std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) const { if (ShardingState::get(opCtx)->enabled()) { return ShardingState::get(opCtx)->shardId().toString(); } @@ -339,71 +332,17 @@ std::string MongoDInterface::getShardName(OperationContext* opCtx) const { return std::string(); } -std::pair<std::vector<FieldPath>, bool> MongoDInterface::collectDocumentKeyFields( +std::pair<std::vector<FieldPath>, bool> MongoInterfaceStandalone::collectDocumentKeyFields( OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const { - if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { - return {{"_id"}, false}; // Nothing is sharded. - } - boost::optional<UUID> uuid; - NamespaceString nss; - if (nssOrUUID.uuid()) { - uuid = *(nssOrUUID.uuid()); - nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(*uuid); - // An empty namespace indicates that the collection has been dropped. Treat it as unsharded - // and mark the fields as final. - if (nss.isEmpty()) { - return {{"_id"}, true}; - } - } else if (nssOrUUID.nss()) { - nss = *(nssOrUUID.nss()); - } - - // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache - // to determine whether the collection is sharded in the first place. - auto catalogCache = Grid::get(opCtx)->catalogCache(); - - const bool collectionIsSharded = catalogCache && [&]() { - auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss); - return routingInfo.isOK() && routingInfo.getValue().cm(); - }(); - - // Collection exists and is not sharded, mark as not final. - if (!collectionIsSharded) { - return {{"_id"}, false}; - } - - auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata { - AutoGetCollection autoColl(opCtx, nss, MODE_IS); - return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); - }(); - - // If the UUID is set in 'nssOrUuid', check that the UUID in the ScopedCollectionMetadata - // matches. Otherwise, this implies that the collection has been dropped and recreated as - // sharded. - if (!scm->isSharded() || (uuid && !scm->uuidMatches(*uuid))) { - return {{"_id"}, false}; - } - - // Unpack the shard key. - std::vector<FieldPath> result; - bool gotId = false; - for (auto& field : scm->getKeyPatternFields()) { - result.emplace_back(field->dottedField()); - gotId |= (result.back().fullPath() == "_id"); - } - if (!gotId) { // If not part of the shard key, "_id" comes last. - result.emplace_back("_id"); - } - // Collection is now sharded so the document key fields will never change, mark as final. - return {result, true}; + return {{"_id"}, false}; // Nothing is sharded. } -std::vector<GenericCursor> MongoDInterface::getIdleCursors( +std::vector<GenericCursor> MongoInterfaceStandalone::getIdleCursors( const intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { return CursorManager::getIdleCursors(expCtx->opCtx, userMode); } -boost::optional<Document> MongoDInterface::lookupSingleDocument( +boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, UUID collectionUUID, @@ -438,7 +377,7 @@ boost::optional<Document> MongoDInterface::lookupSingleDocument( return lookedUpDocument; } -BackupCursorState MongoDInterface::openBackupCursor(OperationContext* opCtx) { +BackupCursorState MongoInterfaceStandalone::openBackupCursor(OperationContext* opCtx) { auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext()); if (backupCursorHooks->enabled()) { return backupCursorHooks->openBackupCursor(opCtx); @@ -447,7 +386,7 @@ BackupCursorState MongoDInterface::openBackupCursor(OperationContext* opCtx) { } } -void MongoDInterface::closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) { +void MongoInterfaceStandalone::closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) { auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext()); if (backupCursorHooks->enabled()) { backupCursorHooks->closeBackupCursor(opCtx, cursorId); @@ -456,7 +395,7 @@ void MongoDInterface::closeBackupCursor(OperationContext* opCtx, std::uint64_t c } } -std::vector<BSONObj> MongoDInterface::getMatchingPlanCacheEntryStats( +std::vector<BSONObj> MongoInterfaceStandalone::getMatchingPlanCacheEntryStats( OperationContext* opCtx, const NamespaceString& nss, const MatchExpression* matchExp) const { const auto serializer = [](const PlanCacheEntry& entry) { BSONObjBuilder out; @@ -481,7 +420,7 @@ std::vector<BSONObj> MongoDInterface::getMatchingPlanCacheEntryStats( return planCache->getMatchingStats(serializer, predicate); } -bool MongoDInterface::uniqueKeyIsSupportedByIndex( +bool MongoInterfaceStandalone::uniqueKeyIsSupportedByIndex( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, const std::set<FieldPath>& uniqueKeyPaths) const { @@ -509,9 +448,8 @@ bool MongoDInterface::uniqueKeyIsSupportedByIndex( return false; } -BSONObj MongoDInterface::_reportCurrentOpForClient(OperationContext* opCtx, - Client* client, - CurrentOpTruncateMode truncateOps) const { +BSONObj MongoInterfaceStandalone::_reportCurrentOpForClient( + OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const { BSONObjBuilder builder; CurOp::reportCurrentOpForClient( @@ -534,9 +472,9 @@ BSONObj MongoDInterface::_reportCurrentOpForClient(OperationContext* opCtx, return builder.obj(); } -void MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, - CurrentOpUserMode userMode, - std::vector<BSONObj>* ops) const { +void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, + CurrentOpUserMode userMode, + std::vector<BSONObj>* ops) const { auto sessionCatalog = SessionCatalog::get(opCtx); const bool authEnabled = @@ -562,7 +500,7 @@ void MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, }); } -std::unique_ptr<CollatorInterface> MongoDInterface::_getCollectionDefaultCollator( +std::unique_ptr<CollatorInterface> MongoInterfaceStandalone::_getCollectionDefaultCollator( OperationContext* opCtx, StringData dbName, UUID collectionUUID) { auto it = _collatorCache.find(collectionUUID); if (it == _collatorCache.end()) { @@ -586,40 +524,4 @@ std::unique_ptr<CollatorInterface> MongoDInterface::_getCollectionDefaultCollato return collator ? collator->clone() : nullptr; } -void MongoDInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs) { - BatchedCommandResponse response; - BatchWriteExecStats stats; - - ClusterWriter::write( - expCtx->opCtx, - BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)), - &stats, - &response); - // TODO SERVER-35403: Add more context for which shard produced the error. - uassertStatusOKWithContext(response.toStatus(), "Insert failed: "); -} - -void MongoDInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, - bool upsert, - bool multi) { - BatchedCommandResponse response; - BatchWriteExecStats stats; - ClusterWriter::write(expCtx->opCtx, - BatchedCommandRequest(buildUpdateOp(ns, - std::move(queries), - std::move(updates), - upsert, - multi, - expCtx->bypassDocumentValidation)), - &stats, - &response); - // TODO SERVER-35403: Add more context for which shard produced the error. - uassertStatusOKWithContext(response.toStatus(), "Update failed: "); -} - } // namespace mongo diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/process_interface_standalone.h index cb9614f2172..981ca2b35c3 100644 --- a/src/mongo/db/pipeline/mongod_process_interface.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -38,26 +38,26 @@ namespace mongo { * Class to provide access to mongod-specific implementations of methods required by some * document sources. */ -class MongoDInterface : public MongoProcessCommon { +class MongoInterfaceStandalone : public MongoProcessCommon { public: - static std::shared_ptr<MongoProcessInterface> create(OperationContext* opCtx); + // static std::shared_ptr<MongoProcessInterface> create(OperationContext* opCtx); - MongoDInterface(OperationContext* opCtx); + MongoInterfaceStandalone(OperationContext* opCtx); - virtual ~MongoDInterface() = default; + virtual ~MongoInterfaceStandalone() = default; void setOperationContext(OperationContext* opCtx) final; DBClientBase* directClient() final; bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; - virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs); - virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, - bool upsert, - bool multi); + void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs) override; + void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& queries, + std::vector<BSONObj>&& updates, + bool upsert, + bool multi) override; CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; void appendLatencyStats(OperationContext* opCtx, const NamespaceString& nss, @@ -84,7 +84,7 @@ public: Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final; std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields( - OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const final; + OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const override; boost::optional<Document> lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, @@ -129,31 +129,4 @@ private: std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache; }; -/** - * Specialized version of the MongoDInterface when this node is a shard server. - */ -class MongoDInterfaceShardServer final : public MongoDInterface { -public: - using MongoDInterface::MongoDInterface; - - /** - * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking, - * routing, stale config handling, etc. - */ - void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs) final; - - /** - * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, - * routing, stale config handling, etc. - */ - void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& queries, - std::vector<BSONObj>&& updates, - bool upsert, - bool multi) final; -}; - } // namespace mongo diff --git a/src/mongo/embedded/SConscript b/src/mongo/embedded/SConscript index 26bd7f7f80a..5b54fadee8d 100644 --- a/src/mongo/embedded/SConscript +++ b/src/mongo/embedded/SConscript @@ -66,6 +66,7 @@ env.Library( 'embedded_options_parser_init.cpp', 'logical_session_cache_factory_embedded.cpp', 'periodic_runner_embedded.cpp', + 'process_interface_factory_embedded.cpp', 'replication_coordinator_embedded.cpp', 'service_entry_point_embedded.cpp', ], @@ -84,6 +85,7 @@ env.Library( '$BUILD_DIR/mongo/db/logical_session_cache', '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/op_observer_impl', + '$BUILD_DIR/mongo/db/pipeline/process_interface_standalone', '$BUILD_DIR/mongo/db/repair_database_and_check_version', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/repl/replica_set_messages', diff --git a/src/mongo/embedded/process_interface_factory_embedded.cpp b/src/mongo/embedded/process_interface_factory_embedded.cpp new file mode 100644 index 00000000000..1a59855ccfe --- /dev/null +++ b/src/mongo/embedded/process_interface_factory_embedded.cpp @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface_standalone.h" + +namespace mongo { + +MONGO_REGISTER_SHIM(MongoProcessInterface::create) +(OperationContext* opCtx)->std::shared_ptr<MongoProcessInterface> { + return std::make_shared<MongoInterfaceStandalone>(opCtx); +} + +} // namespace mongo |