diff options
author | Mihai Andrei <mihai.andrei@mongodb.com> | 2020-02-06 15:58:11 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-13 19:18:40 +0000 |
commit | 00477b903ef7b06b4e561f8cca06f2ad580815a5 (patch) | |
tree | 455f900444ea9bb11576e44e0de5398e0c7b34d9 /src/mongo/db/pipeline | |
parent | cd915cc25b0b27f8527089ac0b7c645b9c76cc42 (diff) | |
download | mongo-00477b903ef7b06b4e561f8cca06f2ad580815a5.tar.gz |
SERVER-45963 Introduce ReplicaSetNodeProcessInterface and a new TaskExecutor for it to use
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/accumulator_js_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_javascript_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp | 117 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h | 34 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp | 181 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h | 95 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/standalone_process_interface.cpp | 160 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/standalone_process_interface.h | 81 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp (renamed from src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp) | 6 |
11 files changed, 534 insertions, 161 deletions
diff --git a/src/mongo/db/pipeline/accumulator_js_test.cpp b/src/mongo/db/pipeline/accumulator_js_test.cpp index 6c5f2b6127b..8497e1ab9c8 100644 --- a/src/mongo/db/pipeline/accumulator_js_test.cpp +++ b/src/mongo/db/pipeline/accumulator_js_test.cpp @@ -35,7 +35,7 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/accumulator_js_reduce.h" #include "mongo/db/pipeline/expression_context_for_test.h" -#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" +#include "mongo/db/pipeline/process_interface/standalone_process_interface.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/dbtests/dbtests.h" #include "mongo/scripting/engine.h" @@ -47,7 +47,7 @@ class MapReduceFixture : public ServiceContextMongoDTest { protected: MapReduceFixture() : _expCtx((new ExpressionContextForTest())) { _expCtx->mongoProcessInterface = - std::make_shared<NonShardServerProcessInterface>(_expCtx->opCtx); + std::make_shared<StandaloneProcessInterface>(_expCtx->opCtx); } boost::intrusive_ptr<ExpressionContextForTest>& getExpCtx() { diff --git a/src/mongo/db/pipeline/expression_javascript_test.cpp b/src/mongo/db/pipeline/expression_javascript_test.cpp index f11d4c0e7ef..10203cc523d 100644 --- a/src/mongo/db/pipeline/expression_javascript_test.cpp +++ b/src/mongo/db/pipeline/expression_javascript_test.cpp @@ -34,7 +34,7 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/pipeline/expression_function.h" -#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" +#include "mongo/db/pipeline/process_interface/standalone_process_interface.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/scripting/engine.h" @@ -48,7 +48,7 @@ protected: MapReduceFixture() : _expCtx((new ExpressionContextForTest())), _vps(_expCtx->variablesParseState) { _expCtx->mongoProcessInterface = - std::make_shared<NonShardServerProcessInterface>(_expCtx->opCtx); + std::make_shared<StandaloneProcessInterface>(_expCtx->opCtx); } boost::intrusive_ptr<ExpressionContextForTest>& getExpCtx() { diff --git a/src/mongo/db/pipeline/process_interface/SConscript b/src/mongo/db/pipeline/process_interface/SConscript index 0a19c65c996..7f7697bcf6b 100644 --- a/src/mongo/db/pipeline/process_interface/SConscript +++ b/src/mongo/db/pipeline/process_interface/SConscript @@ -34,6 +34,8 @@ env.Library( source=[ 'common_mongod_process_interface.cpp', 'non_shardsvr_process_interface.cpp', + 'replica_set_node_process_interface.cpp', + 'standalone_process_interface.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/ops/write_ops_exec', @@ -100,7 +102,7 @@ env.CppUnitTest( target='process_interface_test', source=[ 'mongos_process_interface_test.cpp', - 'shardsvr_process_interface_test.cpp', + 'standalone_process_interface_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/query/query_test_service_context', diff --git a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp index 3845cfcd3f2..744222380f9 100644 --- a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp +++ b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp @@ -30,8 +30,10 @@ #include "mongo/platform/basic.h" #include "mongo/base/shim.h" -#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" +#include "mongo/db/commands/test_commands_enabled.h" +#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h" #include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h" +#include "mongo/db/pipeline/process_interface/standalone_process_interface.h" #include "mongo/db/s/sharding_state.h" namespace mongo { @@ -40,8 +42,11 @@ namespace { std::shared_ptr<MongoProcessInterface> MongoProcessInterfaceCreateImpl(OperationContext* opCtx) { if (ShardingState::get(opCtx)->enabled()) { return std::make_shared<ShardServerProcessInterface>(opCtx); + } else if (getTestCommandsEnabled()) { + if (auto executor = ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(opCtx)) + return std::make_shared<ReplicaSetNodeProcessInterface>(opCtx, executor); } - return std::make_shared<NonShardServerProcessInterface>(opCtx); + return std::make_shared<StandaloneProcessInterface>(opCtx); } auto mongoProcessInterfaceCreateRegistration = MONGO_WEAK_FUNCTION_REGISTRATION( diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp index 5b2f64e4802..196fcb818b4 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -33,7 +33,6 @@ #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog/drop_collection.h" -#include "mongo/db/catalog/list_indexes.h" #include "mongo/db/catalog/rename_collection.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -42,122 +41,6 @@ namespace mongo { -Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { - auto writeResults = performInserts( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); - - // Need to check each result in the batch since the writes are unordered. - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); - } - } - return Status::OK(); -} - -StatusWith<MongoProcessInterface::UpdateResult> NonShardServerProcessInterface::update( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - BatchedObjects&& batch, - const WriteConcernOptions& wc, - UpsertType upsert, - bool multi, - boost::optional<OID> targetEpoch) { - auto writeResults = - performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); - - // Need to check each result in the batch since the writes are unordered. - UpdateResult updateResult; - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); - } - - updateResult.nMatched += result.getValue().getN(); - updateResult.nModified += result.getValue().getNModified(); - } - return updateResult; -} - -std::list<BSONObj> NonShardServerProcessInterface::getIndexSpecs(OperationContext* opCtx, - const NamespaceString& ns, - bool includeBuildUUIDs) { - return listIndexesEmptyListIfMissing(opCtx, ns, includeBuildUUIDs); -} - -void NonShardServerProcessInterface::createIndexesOnEmptyCollection( - OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) { - AutoGetCollection autoColl(opCtx, ns, MODE_X); - writeConflictRetry( - opCtx, "CommonMongodProcessInterface::createIndexesOnEmptyCollection", ns.ns(), [&] { - auto collection = autoColl.getCollection(); - invariant(collection, - str::stream() << "Failed to create indexes for aggregation because " - "collection does not exist: " - << ns << ": " << BSON("indexes" << indexSpecs)); - - invariant(0U == collection->numRecords(opCtx), - str::stream() << "Expected empty collection for index creation: " << ns - << ": numRecords: " << collection->numRecords(opCtx) << ": " - << BSON("indexes" << indexSpecs)); - - // Secondary index builds do not filter existing indexes so we have to do this on the - // primary. - auto removeIndexBuildsToo = false; - auto filteredIndexes = collection->getIndexCatalog()->removeExistingIndexes( - opCtx, indexSpecs, removeIndexBuildsToo); - if (filteredIndexes.empty()) { - return; - } - - WriteUnitOfWork wuow(opCtx); - IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection( - opCtx, collection->uuid(), filteredIndexes, false // fromMigrate - ); - wuow.commit(); - }); -} -void NonShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( - OperationContext* opCtx, - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) { - NamespaceString sourceNs = NamespaceString(renameCommandObj["renameCollection"].String()); - doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx, - sourceNs, - targetNs, - renameCommandObj["dropTarget"].trueValue(), - renameCommandObj["stayTemp"].trueValue(), - originalIndexes, - originalCollectionOptions); -} - -void NonShardServerProcessInterface::createCollection(OperationContext* opCtx, - const std::string& dbName, - const BSONObj& cmdObj) { - uassertStatusOK(mongo::createCollection(opCtx, dbName, cmdObj)); -} - -void NonShardServerProcessInterface::dropCollection(OperationContext* opCtx, - const NamespaceString& ns) { - BSONObjBuilder result; - uassertStatusOK(mongo::dropCollection( - opCtx, ns, result, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops)); -} - -std::unique_ptr<Pipeline, PipelineDeleter> -NonShardServerProcessInterface::attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - bool allowTargetingShards) { - return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline); -} - std::pair<std::vector<FieldPath>, bool> NonShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection( OperationContext* opCtx, const NamespaceString& nss, UUID uuid) const { diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index c79db4bbcf6..1c226a44414 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -45,44 +45,10 @@ class NonShardServerProcessInterface : public CommonMongodProcessInterface { public: using CommonMongodProcessInterface::CommonMongodProcessInterface; - virtual ~NonShardServerProcessInterface() = default; - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { return false; } - Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) override; - StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - BatchedObjects&& batch, - const WriteConcernOptions& wc, - UpsertType upsert, - bool multi, - boost::optional<OID> targetEpoch) override; - - std::list<BSONObj> getIndexSpecs(OperationContext* opCtx, - const NamespaceString& ns, - bool includeBuildUUIDs); - void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes); - void createCollection(OperationContext* opCtx, - const std::string& dbName, - const BSONObj& cmdObj); - void dropCollection(OperationContext* opCtx, const NamespaceString& collection); - void createIndexesOnEmptyCollection(OperationContext* opCtx, - const NamespaceString& ns, - const std::vector<BSONObj>& indexSpecs); - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline, - bool allowTargetingShards) override; std::unique_ptr<ShardFilterer> getShardFilterer( const boost::intrusive_ptr<ExpressionContext>& expCtx) const override { // We'll never do shard filtering on a standalone. diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp new file mode 100644 index 00000000000..44c381834e6 --- /dev/null +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -0,0 +1,181 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h" + +#include "mongo/db/catalog/create_collection.h" +#include "mongo/db/catalog/drop_collection.h" +#include "mongo/db/catalog/list_indexes.h" +#include "mongo/db/catalog/rename_collection.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/index_builds_coordinator.h" + +namespace mongo { + +namespace { +const auto replicaSetNodeExecutor = + ServiceContext::declareDecoration<std::unique_ptr<executor::TaskExecutor>>(); +} // namespace + +executor::TaskExecutor* ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor( + ServiceContext* service) { + return replicaSetNodeExecutor(service).get(); +} + +executor::TaskExecutor* ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor( + OperationContext* opCtx) { + return getReplicaSetNodeExecutor(opCtx->getServiceContext()); +} + +void ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor( + ServiceContext* service, std::unique_ptr<executor::TaskExecutor> executor) { + replicaSetNodeExecutor(service) = std::move(executor); +} + +Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { + auto writeResults = performInserts( + expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + + // Need to check each result in the batch since the writes are unordered. + for (const auto& result : writeResults.results) { + if (result.getStatus() != Status::OK()) { + return result.getStatus(); + } + } + return Status::OK(); +} + +StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + BatchedObjects&& batch, + const WriteConcernOptions& wc, + UpsertType upsert, + bool multi, + boost::optional<OID> targetEpoch) { + auto writeResults = + performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); + + // Need to check each result in the batch since the writes are unordered. + UpdateResult updateResult; + for (const auto& result : writeResults.results) { + if (result.getStatus() != Status::OK()) { + return result.getStatus(); + } + + updateResult.nMatched += result.getValue().getN(); + updateResult.nModified += result.getValue().getNModified(); + } + return updateResult; +} + +std::list<BSONObj> ReplicaSetNodeProcessInterface::getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs) { + return listIndexesEmptyListIfMissing(opCtx, ns, includeBuildUUIDs); +} + +void ReplicaSetNodeProcessInterface::createIndexesOnEmptyCollection( + OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) { + AutoGetCollection autoColl(opCtx, ns, MODE_X); + writeConflictRetry( + opCtx, "CommonMongodProcessInterface::createIndexesOnEmptyCollection", ns.ns(), [&] { + auto collection = autoColl.getCollection(); + invariant(collection, + str::stream() << "Failed to create indexes for aggregation because " + "collection does not exist: " + << ns << ": " << BSON("indexes" << indexSpecs)); + + invariant(0U == collection->numRecords(opCtx), + str::stream() << "Expected empty collection for index creation: " << ns + << ": numRecords: " << collection->numRecords(opCtx) << ": " + << BSON("indexes" << indexSpecs)); + + // Secondary index builds do not filter existing indexes so we have to do this on the + // primary. + auto removeIndexBuildsToo = false; + auto filteredIndexes = collection->getIndexCatalog()->removeExistingIndexes( + opCtx, indexSpecs, removeIndexBuildsToo); + if (filteredIndexes.empty()) { + return; + } + + WriteUnitOfWork wuow(opCtx); + IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection( + opCtx, collection->uuid(), filteredIndexes, false // fromMigrate + ); + wuow.commit(); + }); +} +void ReplicaSetNodeProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) { + NamespaceString sourceNs = NamespaceString(renameCommandObj["renameCollection"].String()); + doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx, + sourceNs, + targetNs, + renameCommandObj["dropTarget"].trueValue(), + renameCommandObj["stayTemp"].trueValue(), + originalIndexes, + originalCollectionOptions); +} + +void ReplicaSetNodeProcessInterface::createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) { + uassertStatusOK(mongo::createCollection(opCtx, dbName, cmdObj)); +} + +void ReplicaSetNodeProcessInterface::dropCollection(OperationContext* opCtx, + const NamespaceString& ns) { + BSONObjBuilder result; + uassertStatusOK(mongo::dropCollection( + opCtx, ns, result, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops)); +} + +std::unique_ptr<Pipeline, PipelineDeleter> +ReplicaSetNodeProcessInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* ownedPipeline, + bool allowTargetingShards) { + return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h new file mode 100644 index 00000000000..992e28b6839 --- /dev/null +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" + +namespace mongo { + +/** + * An implementation of the MongoProcessInterface used on replica set nodes when sharding is not + * enabled. + */ +class ReplicaSetNodeProcessInterface final : public NonShardServerProcessInterface { +public: + using NonShardServerProcessInterface::NonShardServerProcessInterface; + + static executor::TaskExecutor* getReplicaSetNodeExecutor(ServiceContext* service); + + static executor::TaskExecutor* getReplicaSetNodeExecutor(OperationContext* opCtx); + + static void setReplicaSetNodeExecutor(ServiceContext* service, + std::unique_ptr<executor::TaskExecutor> executor); + + ReplicaSetNodeProcessInterface(OperationContext* opCtx, executor::TaskExecutor* executor) + : NonShardServerProcessInterface(opCtx), _executor(executor) {} + + virtual ~ReplicaSetNodeProcessInterface() = default; + + Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) final; + StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + BatchedObjects&& batch, + const WriteConcernOptions& wc, + UpsertType upsert, + bool multi, + boost::optional<OID> targetEpoch) override; + + std::list<BSONObj> getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs); + void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes); + void createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj); + void dropCollection(OperationContext* opCtx, const NamespaceString& collection); + void createIndexesOnEmptyCollection(OperationContext* opCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& indexSpecs); + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline, + bool allowTargetingShards) override; + +private: + executor::TaskExecutor* _executor; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/standalone_process_interface.cpp b/src/mongo/db/pipeline/process_interface/standalone_process_interface.cpp new file mode 100644 index 00000000000..f4a805a595f --- /dev/null +++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface.cpp @@ -0,0 +1,160 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface/standalone_process_interface.h" + +#include "mongo/db/catalog/create_collection.h" +#include "mongo/db/catalog/drop_collection.h" +#include "mongo/db/catalog/list_indexes.h" +#include "mongo/db/catalog/rename_collection.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/index_builds_coordinator.h" + +namespace mongo { + +Status StandaloneProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { + auto writeResults = performInserts( + expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + + // Need to check each result in the batch since the writes are unordered. + for (const auto& result : writeResults.results) { + if (result.getStatus() != Status::OK()) { + return result.getStatus(); + } + } + return Status::OK(); +} + +StatusWith<MongoProcessInterface::UpdateResult> StandaloneProcessInterface::update( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + BatchedObjects&& batch, + const WriteConcernOptions& wc, + UpsertType upsert, + bool multi, + boost::optional<OID> targetEpoch) { + auto writeResults = + performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); + + // Need to check each result in the batch since the writes are unordered. + UpdateResult updateResult; + for (const auto& result : writeResults.results) { + if (result.getStatus() != Status::OK()) { + return result.getStatus(); + } + + updateResult.nMatched += result.getValue().getN(); + updateResult.nModified += result.getValue().getNModified(); + } + return updateResult; +} + +std::list<BSONObj> StandaloneProcessInterface::getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs) { + return listIndexesEmptyListIfMissing(opCtx, ns, includeBuildUUIDs); +} + +void StandaloneProcessInterface::createIndexesOnEmptyCollection( + OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) { + AutoGetCollection autoColl(opCtx, ns, MODE_X); + writeConflictRetry( + opCtx, "CommonMongodProcessInterface::createIndexesOnEmptyCollection", ns.ns(), [&] { + auto collection = autoColl.getCollection(); + invariant(collection, + str::stream() << "Failed to create indexes for aggregation because " + "collection does not exist: " + << ns << ": " << BSON("indexes" << indexSpecs)); + + invariant(0U == collection->numRecords(opCtx), + str::stream() << "Expected empty collection for index creation: " << ns + << ": numRecords: " << collection->numRecords(opCtx) << ": " + << BSON("indexes" << indexSpecs)); + + // Secondary index builds do not filter existing indexes so we have to do this on the + // primary. + auto removeIndexBuildsToo = false; + auto filteredIndexes = collection->getIndexCatalog()->removeExistingIndexes( + opCtx, indexSpecs, removeIndexBuildsToo); + if (filteredIndexes.empty()) { + return; + } + + WriteUnitOfWork wuow(opCtx); + IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection( + opCtx, collection->uuid(), filteredIndexes, false // fromMigrate + ); + wuow.commit(); + }); +} +void StandaloneProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) { + NamespaceString sourceNs = NamespaceString(renameCommandObj["renameCollection"].String()); + doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx, + sourceNs, + targetNs, + renameCommandObj["dropTarget"].trueValue(), + renameCommandObj["stayTemp"].trueValue(), + originalIndexes, + originalCollectionOptions); +} + +void StandaloneProcessInterface::createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) { + uassertStatusOK(mongo::createCollection(opCtx, dbName, cmdObj)); +} + +void StandaloneProcessInterface::dropCollection(OperationContext* opCtx, + const NamespaceString& ns) { + BSONObjBuilder result; + uassertStatusOK(mongo::dropCollection( + opCtx, ns, result, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops)); +} + +std::unique_ptr<Pipeline, PipelineDeleter> StandaloneProcessInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* ownedPipeline, + bool allowTargetingShards) { + return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/standalone_process_interface.h b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h new file mode 100644 index 00000000000..3fa54515599 --- /dev/null +++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" + +namespace mongo { + +/** + * Class to provide access to standalone specific implementations of methods required by some + * document sources. + */ +class StandaloneProcessInterface : public NonShardServerProcessInterface { +public: + using NonShardServerProcessInterface::NonShardServerProcessInterface; + + virtual ~StandaloneProcessInterface() = default; + + Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) override; + StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + BatchedObjects&& batch, + const WriteConcernOptions& wc, + UpsertType upsert, + bool multi, + boost::optional<OID> targetEpoch) override; + std::list<BSONObj> getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs); + void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes); + void createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj); + void dropCollection(OperationContext* opCtx, const NamespaceString& collection); + void createIndexesOnEmptyCollection(OperationContext* opCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& indexSpecs); + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline, + bool allowTargetingShards) override; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp b/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp index 14df45018df..5d7cc9d535a 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp +++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp @@ -30,15 +30,15 @@ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" -#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" +#include "mongo/db/pipeline/process_interface/standalone_process_interface.h" #include "mongo/unittest/unittest.h" namespace mongo { namespace { -class MongoProcessInterfaceForTest : public NonShardServerProcessInterface { +class MongoProcessInterfaceForTest : public StandaloneProcessInterface { public: - using NonShardServerProcessInterface::NonShardServerProcessInterface; + using StandaloneProcessInterface::StandaloneProcessInterface; bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, |