summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMihai Andrei <mihai.andrei@mongodb.com>2020-02-06 15:58:11 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-13 19:18:40 +0000
commit00477b903ef7b06b4e561f8cca06f2ad580815a5 (patch)
tree455f900444ea9bb11576e44e0de5398e0c7b34d9 /src/mongo
parentcd915cc25b0b27f8527089ac0b7c645b9c76cc42 (diff)
downloadmongo-00477b903ef7b06b4e561f8cca06f2ad580815a5.tar.gz
SERVER-45963 Introduce ReplicaSetNodeProcessInterface and a new TaskExecutor for it to use
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/db.cpp22
-rw-r--r--src/mongo/db/pipeline/accumulator_js_test.cpp4
-rw-r--r--src/mongo/db/pipeline/expression_javascript_test.cpp4
-rw-r--r--src/mongo/db/pipeline/process_interface/SConscript4
-rw-r--r--src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp9
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp117
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h34
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp181
-rw-r--r--src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h95
-rw-r--r--src/mongo/db/pipeline/process_interface/standalone_process_interface.cpp160
-rw-r--r--src/mongo/db/pipeline/process_interface/standalone_process_interface.h81
-rw-r--r--src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp (renamed from src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp)6
-rw-r--r--src/mongo/embedded/process_interface_factory_embedded.cpp4
13 files changed, 558 insertions, 163 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 59da6fe85a1..6444d5ecda4 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -102,6 +102,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/periodic_runner_job_abort_expired_transactions.h"
#include "mongo/db/periodic_runner_job_decrease_snapshot_cache_pressure.h"
+#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/read_write_concern_defaults_cache_lookup_mongod.h"
#include "mongo/db/repair_database_and_check_version.h"
@@ -878,6 +879,21 @@ void setUpCatalog(ServiceContext* serviceContext) {
Collection::Factory::set(serviceContext, std::make_unique<CollectionImpl::FactoryImpl>());
}
+auto makeReplicaSetNodeExecutor(ServiceContext* serviceContext) {
+ ThreadPool::Options tpOptions;
+ tpOptions.threadNamePrefix = "ReplNodeDbWorker-";
+ tpOptions.poolName = "ReplNodeDbWorkerThreadPool";
+ tpOptions.maxThreads = ThreadPool::Options::kUnlimited;
+ tpOptions.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+ // TODO SERVER-45966 Add necessary hooks.
+ auto hookList = nullptr;
+ return std::make_unique<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPool>(tpOptions),
+ executor::makeNetworkInterface("ReplNodeDbWorkerNetwork", nullptr, std::move(hookList)));
+}
+
auto makeReplicationExecutor(ServiceContext* serviceContext) {
ThreadPool::Options tpOptions;
tpOptions.threadNamePrefix = "ReplCoord-";
@@ -928,6 +944,12 @@ void setUpReplication(ServiceContext* serviceContext) {
replicationProcess,
storageInterface,
SecureRandom().nextInt64());
+ // Only create a ReplicaSetNodeExecutor if sharding is disabled and replication is enabled.
+ // Note that sharding sets up its own executors for scheduling work to remote nodes.
+ if (!ShardingState::get(serviceContext)->enabled() && replCoord->isReplEnabled())
+ ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor(
+ serviceContext, makeReplicaSetNodeExecutor(serviceContext));
+
repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord));
repl::setOplogCollectionName(serviceContext);
diff --git a/src/mongo/db/pipeline/accumulator_js_test.cpp b/src/mongo/db/pipeline/accumulator_js_test.cpp
index 6c5f2b6127b..8497e1ab9c8 100644
--- a/src/mongo/db/pipeline/accumulator_js_test.cpp
+++ b/src/mongo/db/pipeline/accumulator_js_test.cpp
@@ -35,7 +35,7 @@
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/pipeline/accumulator_js_reduce.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
-#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h"
+#include "mongo/db/pipeline/process_interface/standalone_process_interface.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/dbtests/dbtests.h"
#include "mongo/scripting/engine.h"
@@ -47,7 +47,7 @@ class MapReduceFixture : public ServiceContextMongoDTest {
protected:
MapReduceFixture() : _expCtx((new ExpressionContextForTest())) {
_expCtx->mongoProcessInterface =
- std::make_shared<NonShardServerProcessInterface>(_expCtx->opCtx);
+ std::make_shared<StandaloneProcessInterface>(_expCtx->opCtx);
}
boost::intrusive_ptr<ExpressionContextForTest>& getExpCtx() {
diff --git a/src/mongo/db/pipeline/expression_javascript_test.cpp b/src/mongo/db/pipeline/expression_javascript_test.cpp
index f11d4c0e7ef..10203cc523d 100644
--- a/src/mongo/db/pipeline/expression_javascript_test.cpp
+++ b/src/mongo/db/pipeline/expression_javascript_test.cpp
@@ -34,7 +34,7 @@
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/pipeline/expression_function.h"
-#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h"
+#include "mongo/db/pipeline/process_interface/standalone_process_interface.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/scripting/engine.h"
@@ -48,7 +48,7 @@ protected:
MapReduceFixture()
: _expCtx((new ExpressionContextForTest())), _vps(_expCtx->variablesParseState) {
_expCtx->mongoProcessInterface =
- std::make_shared<NonShardServerProcessInterface>(_expCtx->opCtx);
+ std::make_shared<StandaloneProcessInterface>(_expCtx->opCtx);
}
boost::intrusive_ptr<ExpressionContextForTest>& getExpCtx() {
diff --git a/src/mongo/db/pipeline/process_interface/SConscript b/src/mongo/db/pipeline/process_interface/SConscript
index 0a19c65c996..7f7697bcf6b 100644
--- a/src/mongo/db/pipeline/process_interface/SConscript
+++ b/src/mongo/db/pipeline/process_interface/SConscript
@@ -34,6 +34,8 @@ env.Library(
source=[
'common_mongod_process_interface.cpp',
'non_shardsvr_process_interface.cpp',
+ 'replica_set_node_process_interface.cpp',
+ 'standalone_process_interface.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
@@ -100,7 +102,7 @@ env.CppUnitTest(
target='process_interface_test',
source=[
'mongos_process_interface_test.cpp',
- 'shardsvr_process_interface_test.cpp',
+ 'standalone_process_interface_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/query/query_test_service_context',
diff --git a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
index 3845cfcd3f2..744222380f9 100644
--- a/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp
@@ -30,8 +30,10 @@
#include "mongo/platform/basic.h"
#include "mongo/base/shim.h"
-#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h"
+#include "mongo/db/commands/test_commands_enabled.h"
+#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h"
#include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h"
+#include "mongo/db/pipeline/process_interface/standalone_process_interface.h"
#include "mongo/db/s/sharding_state.h"
namespace mongo {
@@ -40,8 +42,11 @@ namespace {
std::shared_ptr<MongoProcessInterface> MongoProcessInterfaceCreateImpl(OperationContext* opCtx) {
if (ShardingState::get(opCtx)->enabled()) {
return std::make_shared<ShardServerProcessInterface>(opCtx);
+ } else if (getTestCommandsEnabled()) {
+ if (auto executor = ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(opCtx))
+ return std::make_shared<ReplicaSetNodeProcessInterface>(opCtx, executor);
}
- return std::make_shared<NonShardServerProcessInterface>(opCtx);
+ return std::make_shared<StandaloneProcessInterface>(opCtx);
}
auto mongoProcessInterfaceCreateRegistration = MONGO_WEAK_FUNCTION_REGISTRATION(
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
index 5b2f64e4802..196fcb818b4 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog/drop_collection.h"
-#include "mongo/db/catalog/list_indexes.h"
#include "mongo/db/catalog/rename_collection.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -42,122 +41,6 @@
namespace mongo {
-Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc,
- boost::optional<OID> targetEpoch) {
- auto writeResults = performInserts(
- expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
-
- // Need to check each result in the batch since the writes are unordered.
- for (const auto& result : writeResults.results) {
- if (result.getStatus() != Status::OK()) {
- return result.getStatus();
- }
- }
- return Status::OK();
-}
-
-StatusWith<MongoProcessInterface::UpdateResult> NonShardServerProcessInterface::update(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- BatchedObjects&& batch,
- const WriteConcernOptions& wc,
- UpsertType upsert,
- bool multi,
- boost::optional<OID> targetEpoch) {
- auto writeResults =
- performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
-
- // Need to check each result in the batch since the writes are unordered.
- UpdateResult updateResult;
- for (const auto& result : writeResults.results) {
- if (result.getStatus() != Status::OK()) {
- return result.getStatus();
- }
-
- updateResult.nMatched += result.getValue().getN();
- updateResult.nModified += result.getValue().getNModified();
- }
- return updateResult;
-}
-
-std::list<BSONObj> NonShardServerProcessInterface::getIndexSpecs(OperationContext* opCtx,
- const NamespaceString& ns,
- bool includeBuildUUIDs) {
- return listIndexesEmptyListIfMissing(opCtx, ns, includeBuildUUIDs);
-}
-
-void NonShardServerProcessInterface::createIndexesOnEmptyCollection(
- OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) {
- AutoGetCollection autoColl(opCtx, ns, MODE_X);
- writeConflictRetry(
- opCtx, "CommonMongodProcessInterface::createIndexesOnEmptyCollection", ns.ns(), [&] {
- auto collection = autoColl.getCollection();
- invariant(collection,
- str::stream() << "Failed to create indexes for aggregation because "
- "collection does not exist: "
- << ns << ": " << BSON("indexes" << indexSpecs));
-
- invariant(0U == collection->numRecords(opCtx),
- str::stream() << "Expected empty collection for index creation: " << ns
- << ": numRecords: " << collection->numRecords(opCtx) << ": "
- << BSON("indexes" << indexSpecs));
-
- // Secondary index builds do not filter existing indexes so we have to do this on the
- // primary.
- auto removeIndexBuildsToo = false;
- auto filteredIndexes = collection->getIndexCatalog()->removeExistingIndexes(
- opCtx, indexSpecs, removeIndexBuildsToo);
- if (filteredIndexes.empty()) {
- return;
- }
-
- WriteUnitOfWork wuow(opCtx);
- IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection(
- opCtx, collection->uuid(), filteredIndexes, false // fromMigrate
- );
- wuow.commit();
- });
-}
-void NonShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged(
- OperationContext* opCtx,
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) {
- NamespaceString sourceNs = NamespaceString(renameCommandObj["renameCollection"].String());
- doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx,
- sourceNs,
- targetNs,
- renameCommandObj["dropTarget"].trueValue(),
- renameCommandObj["stayTemp"].trueValue(),
- originalIndexes,
- originalCollectionOptions);
-}
-
-void NonShardServerProcessInterface::createCollection(OperationContext* opCtx,
- const std::string& dbName,
- const BSONObj& cmdObj) {
- uassertStatusOK(mongo::createCollection(opCtx, dbName, cmdObj));
-}
-
-void NonShardServerProcessInterface::dropCollection(OperationContext* opCtx,
- const NamespaceString& ns) {
- BSONObjBuilder result;
- uassertStatusOK(mongo::dropCollection(
- opCtx, ns, result, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops));
-}
-
-std::unique_ptr<Pipeline, PipelineDeleter>
-NonShardServerProcessInterface::attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* ownedPipeline,
- bool allowTargetingShards) {
- return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline);
-}
-
std::pair<std::vector<FieldPath>, bool>
NonShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection(
OperationContext* opCtx, const NamespaceString& nss, UUID uuid) const {
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
index c79db4bbcf6..1c226a44414 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
@@ -45,44 +45,10 @@ class NonShardServerProcessInterface : public CommonMongodProcessInterface {
public:
using CommonMongodProcessInterface::CommonMongodProcessInterface;
- virtual ~NonShardServerProcessInterface() = default;
-
bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
return false;
}
- Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc,
- boost::optional<OID> targetEpoch) override;
- StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- BatchedObjects&& batch,
- const WriteConcernOptions& wc,
- UpsertType upsert,
- bool multi,
- boost::optional<OID> targetEpoch) override;
-
- std::list<BSONObj> getIndexSpecs(OperationContext* opCtx,
- const NamespaceString& ns,
- bool includeBuildUUIDs);
- void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx,
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes);
- void createCollection(OperationContext* opCtx,
- const std::string& dbName,
- const BSONObj& cmdObj);
- void dropCollection(OperationContext* opCtx, const NamespaceString& collection);
- void createIndexesOnEmptyCollection(OperationContext* opCtx,
- const NamespaceString& ns,
- const std::vector<BSONObj>& indexSpecs);
- std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline,
- bool allowTargetingShards) override;
std::unique_ptr<ShardFilterer> getShardFilterer(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const override {
// We'll never do shard filtering on a standalone.
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
new file mode 100644
index 00000000000..44c381834e6
--- /dev/null
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp
@@ -0,0 +1,181 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/process_interface/replica_set_node_process_interface.h"
+
+#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog/drop_collection.h"
+#include "mongo/db/catalog/list_indexes.h"
+#include "mongo/db/catalog/rename_collection.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/index_builds_coordinator.h"
+
+namespace mongo {
+
+namespace {
+const auto replicaSetNodeExecutor =
+ ServiceContext::declareDecoration<std::unique_ptr<executor::TaskExecutor>>();
+} // namespace
+
+executor::TaskExecutor* ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(
+ ServiceContext* service) {
+ return replicaSetNodeExecutor(service).get();
+}
+
+executor::TaskExecutor* ReplicaSetNodeProcessInterface::getReplicaSetNodeExecutor(
+ OperationContext* opCtx) {
+ return getReplicaSetNodeExecutor(opCtx->getServiceContext());
+}
+
+void ReplicaSetNodeProcessInterface::setReplicaSetNodeExecutor(
+ ServiceContext* service, std::unique_ptr<executor::TaskExecutor> executor) {
+ replicaSetNodeExecutor(service) = std::move(executor);
+}
+
+Status ReplicaSetNodeProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
+ auto writeResults = performInserts(
+ expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+
+ // Need to check each result in the batch since the writes are unordered.
+ for (const auto& result : writeResults.results) {
+ if (result.getStatus() != Status::OK()) {
+ return result.getStatus();
+ }
+ }
+ return Status::OK();
+}
+
+StatusWith<MongoProcessInterface::UpdateResult> ReplicaSetNodeProcessInterface::update(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ BatchedObjects&& batch,
+ const WriteConcernOptions& wc,
+ UpsertType upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) {
+ auto writeResults =
+ performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
+
+ // Need to check each result in the batch since the writes are unordered.
+ UpdateResult updateResult;
+ for (const auto& result : writeResults.results) {
+ if (result.getStatus() != Status::OK()) {
+ return result.getStatus();
+ }
+
+ updateResult.nMatched += result.getValue().getN();
+ updateResult.nModified += result.getValue().getNModified();
+ }
+ return updateResult;
+}
+
+std::list<BSONObj> ReplicaSetNodeProcessInterface::getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs) {
+ return listIndexesEmptyListIfMissing(opCtx, ns, includeBuildUUIDs);
+}
+
+void ReplicaSetNodeProcessInterface::createIndexesOnEmptyCollection(
+ OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) {
+ AutoGetCollection autoColl(opCtx, ns, MODE_X);
+ writeConflictRetry(
+ opCtx, "CommonMongodProcessInterface::createIndexesOnEmptyCollection", ns.ns(), [&] {
+ auto collection = autoColl.getCollection();
+ invariant(collection,
+ str::stream() << "Failed to create indexes for aggregation because "
+ "collection does not exist: "
+ << ns << ": " << BSON("indexes" << indexSpecs));
+
+ invariant(0U == collection->numRecords(opCtx),
+ str::stream() << "Expected empty collection for index creation: " << ns
+ << ": numRecords: " << collection->numRecords(opCtx) << ": "
+ << BSON("indexes" << indexSpecs));
+
+ // Secondary index builds do not filter existing indexes so we have to do this on the
+ // primary.
+ auto removeIndexBuildsToo = false;
+ auto filteredIndexes = collection->getIndexCatalog()->removeExistingIndexes(
+ opCtx, indexSpecs, removeIndexBuildsToo);
+ if (filteredIndexes.empty()) {
+ return;
+ }
+
+ WriteUnitOfWork wuow(opCtx);
+ IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection(
+ opCtx, collection->uuid(), filteredIndexes, false // fromMigrate
+ );
+ wuow.commit();
+ });
+}
+void ReplicaSetNodeProcessInterface::renameIfOptionsAndIndexesHaveNotChanged(
+ OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) {
+ NamespaceString sourceNs = NamespaceString(renameCommandObj["renameCollection"].String());
+ doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx,
+ sourceNs,
+ targetNs,
+ renameCommandObj["dropTarget"].trueValue(),
+ renameCommandObj["stayTemp"].trueValue(),
+ originalIndexes,
+ originalCollectionOptions);
+}
+
+void ReplicaSetNodeProcessInterface::createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj) {
+ uassertStatusOK(mongo::createCollection(opCtx, dbName, cmdObj));
+}
+
+void ReplicaSetNodeProcessInterface::dropCollection(OperationContext* opCtx,
+ const NamespaceString& ns) {
+ BSONObjBuilder result;
+ uassertStatusOK(mongo::dropCollection(
+ opCtx, ns, result, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops));
+}
+
+std::unique_ptr<Pipeline, PipelineDeleter>
+ReplicaSetNodeProcessInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* ownedPipeline,
+ bool allowTargetingShards) {
+ return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
new file mode 100644
index 00000000000..992e28b6839
--- /dev/null
+++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h"
+
+namespace mongo {
+
+/**
+ * An implementation of the MongoProcessInterface used on replica set nodes when sharding is not
+ * enabled.
+ */
+class ReplicaSetNodeProcessInterface final : public NonShardServerProcessInterface {
+public:
+ using NonShardServerProcessInterface::NonShardServerProcessInterface;
+
+ static executor::TaskExecutor* getReplicaSetNodeExecutor(ServiceContext* service);
+
+ static executor::TaskExecutor* getReplicaSetNodeExecutor(OperationContext* opCtx);
+
+ static void setReplicaSetNodeExecutor(ServiceContext* service,
+ std::unique_ptr<executor::TaskExecutor> executor);
+
+ ReplicaSetNodeProcessInterface(OperationContext* opCtx, executor::TaskExecutor* executor)
+ : NonShardServerProcessInterface(opCtx), _executor(executor) {}
+
+ virtual ~ReplicaSetNodeProcessInterface() = default;
+
+ Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) final;
+ StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ BatchedObjects&& batch,
+ const WriteConcernOptions& wc,
+ UpsertType upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) override;
+
+ std::list<BSONObj> getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs);
+ void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes);
+ void createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj);
+ void dropCollection(OperationContext* opCtx, const NamespaceString& collection);
+ void createIndexesOnEmptyCollection(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& indexSpecs);
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline,
+ bool allowTargetingShards) override;
+
+private:
+ executor::TaskExecutor* _executor;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/standalone_process_interface.cpp b/src/mongo/db/pipeline/process_interface/standalone_process_interface.cpp
new file mode 100644
index 00000000000..f4a805a595f
--- /dev/null
+++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface.cpp
@@ -0,0 +1,160 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/process_interface/standalone_process_interface.h"
+
+#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog/drop_collection.h"
+#include "mongo/db/catalog/list_indexes.h"
+#include "mongo/db/catalog/rename_collection.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/index_builds_coordinator.h"
+
+namespace mongo {
+
+Status StandaloneProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
+ auto writeResults = performInserts(
+ expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+
+ // Need to check each result in the batch since the writes are unordered.
+ for (const auto& result : writeResults.results) {
+ if (result.getStatus() != Status::OK()) {
+ return result.getStatus();
+ }
+ }
+ return Status::OK();
+}
+
+StatusWith<MongoProcessInterface::UpdateResult> StandaloneProcessInterface::update(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ BatchedObjects&& batch,
+ const WriteConcernOptions& wc,
+ UpsertType upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) {
+ auto writeResults =
+ performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi));
+
+ // Need to check each result in the batch since the writes are unordered.
+ UpdateResult updateResult;
+ for (const auto& result : writeResults.results) {
+ if (result.getStatus() != Status::OK()) {
+ return result.getStatus();
+ }
+
+ updateResult.nMatched += result.getValue().getN();
+ updateResult.nModified += result.getValue().getNModified();
+ }
+ return updateResult;
+}
+
+std::list<BSONObj> StandaloneProcessInterface::getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs) {
+ return listIndexesEmptyListIfMissing(opCtx, ns, includeBuildUUIDs);
+}
+
+void StandaloneProcessInterface::createIndexesOnEmptyCollection(
+ OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) {
+ AutoGetCollection autoColl(opCtx, ns, MODE_X);
+ writeConflictRetry(
+ opCtx, "CommonMongodProcessInterface::createIndexesOnEmptyCollection", ns.ns(), [&] {
+ auto collection = autoColl.getCollection();
+ invariant(collection,
+ str::stream() << "Failed to create indexes for aggregation because "
+ "collection does not exist: "
+ << ns << ": " << BSON("indexes" << indexSpecs));
+
+ invariant(0U == collection->numRecords(opCtx),
+ str::stream() << "Expected empty collection for index creation: " << ns
+ << ": numRecords: " << collection->numRecords(opCtx) << ": "
+ << BSON("indexes" << indexSpecs));
+
+ // Secondary index builds do not filter existing indexes so we have to do this on the
+ // primary.
+ auto removeIndexBuildsToo = false;
+ auto filteredIndexes = collection->getIndexCatalog()->removeExistingIndexes(
+ opCtx, indexSpecs, removeIndexBuildsToo);
+ if (filteredIndexes.empty()) {
+ return;
+ }
+
+ WriteUnitOfWork wuow(opCtx);
+ IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection(
+ opCtx, collection->uuid(), filteredIndexes, false // fromMigrate
+ );
+ wuow.commit();
+ });
+}
+void StandaloneProcessInterface::renameIfOptionsAndIndexesHaveNotChanged(
+ OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) {
+ NamespaceString sourceNs = NamespaceString(renameCommandObj["renameCollection"].String());
+ doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx,
+ sourceNs,
+ targetNs,
+ renameCommandObj["dropTarget"].trueValue(),
+ renameCommandObj["stayTemp"].trueValue(),
+ originalIndexes,
+ originalCollectionOptions);
+}
+
+void StandaloneProcessInterface::createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj) {
+ uassertStatusOK(mongo::createCollection(opCtx, dbName, cmdObj));
+}
+
+void StandaloneProcessInterface::dropCollection(OperationContext* opCtx,
+ const NamespaceString& ns) {
+ BSONObjBuilder result;
+ uassertStatusOK(mongo::dropCollection(
+ opCtx, ns, result, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops));
+}
+
+std::unique_ptr<Pipeline, PipelineDeleter> StandaloneProcessInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* ownedPipeline,
+ bool allowTargetingShards) {
+ return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/standalone_process_interface.h b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h
new file mode 100644
index 00000000000..3fa54515599
--- /dev/null
+++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface.h
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h"
+
+namespace mongo {
+
+/**
+ * Class to provide access to standalone specific implementations of methods required by some
+ * document sources.
+ */
+class StandaloneProcessInterface : public NonShardServerProcessInterface {
+public:
+ using NonShardServerProcessInterface::NonShardServerProcessInterface;
+
+ virtual ~StandaloneProcessInterface() = default;
+
+ Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) override;
+ StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ BatchedObjects&& batch,
+ const WriteConcernOptions& wc,
+ UpsertType upsert,
+ bool multi,
+ boost::optional<OID> targetEpoch) override;
+ std::list<BSONObj> getIndexSpecs(OperationContext* opCtx,
+ const NamespaceString& ns,
+ bool includeBuildUUIDs);
+ void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes);
+ void createCollection(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj);
+ void dropCollection(OperationContext* opCtx, const NamespaceString& collection);
+ void createIndexesOnEmptyCollection(OperationContext* opCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& indexSpecs);
+ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline,
+ bool allowTargetingShards) override;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp b/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp
index 14df45018df..5d7cc9d535a 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp
+++ b/src/mongo/db/pipeline/process_interface/standalone_process_interface_test.cpp
@@ -30,15 +30,15 @@
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
-#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h"
+#include "mongo/db/pipeline/process_interface/standalone_process_interface.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace {
-class MongoProcessInterfaceForTest : public NonShardServerProcessInterface {
+class MongoProcessInterfaceForTest : public StandaloneProcessInterface {
public:
- using NonShardServerProcessInterface::NonShardServerProcessInterface;
+ using StandaloneProcessInterface::StandaloneProcessInterface;
bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
diff --git a/src/mongo/embedded/process_interface_factory_embedded.cpp b/src/mongo/embedded/process_interface_factory_embedded.cpp
index 01b8d1c1d7e..3011e95b453 100644
--- a/src/mongo/embedded/process_interface_factory_embedded.cpp
+++ b/src/mongo/embedded/process_interface_factory_embedded.cpp
@@ -30,13 +30,13 @@
#include "mongo/platform/basic.h"
#include "mongo/base/shim.h"
-#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h"
+#include "mongo/db/pipeline/process_interface/standalone_process_interface.h"
namespace mongo {
namespace {
std::shared_ptr<MongoProcessInterface> mongoProcessInterfaceCreateImpl(OperationContext* opCtx) {
- return std::make_shared<NonShardServerProcessInterface>(opCtx);
+ return std::make_shared<StandaloneProcessInterface>(opCtx);
}
auto mongoProcessInterfaceCreateRegistration = MONGO_WEAK_FUNCTION_REGISTRATION(