summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorHenrik Edin <henrik.edin@mongodb.com>2018-09-24 14:10:22 -0400
committerHenrik Edin <henrik.edin@mongodb.com>2018-10-03 14:46:14 -0400
commit58501015eff7961dc378abe1d49e064a3dcf3dbc (patch)
treee0b41dd135ec7657605ca87e99b9ef2d3fa3baed /src/mongo
parent860b392d9d3c006090a4c7fc3c6f3fa5460e5c5c (diff)
downloadmongo-58501015eff7961dc378abe1d49e064a3dcf3dbc.tar.gz
SERVER-37295 Remove embedded dependency on process interface shard server.
Refactor process interface system to use shim to allow for separate factories for embedded and mongod.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/commands/SConscript2
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp4
-rw-r--r--src/mongo/db/pipeline/SConscript37
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.cpp37
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h8
-rw-r--r--src/mongo/db/pipeline/process_interface_factory_mongod.cpp43
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp245
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h67
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp (renamed from src/mongo/db/pipeline/mongod_process_interface.cpp)190
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h (renamed from src/mongo/db/pipeline/mongod_process_interface.h)55
-rw-r--r--src/mongo/embedded/SConscript2
-rw-r--r--src/mongo/embedded/process_interface_factory_embedded.cpp40
13 files changed, 539 insertions, 192 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 7d82797d5e1..593923ce799 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -314,6 +314,7 @@ env.Library(
'db/mongodandmongos',
'db/periodic_runner_job_abort_expired_transactions',
'db/periodic_runner_job_decrease_snapshot_cache_pressure',
+ 'db/pipeline/process_interface_factory_mongod',
'db/query_exec',
'db/repair_database',
'db/repair_database_and_check_version',
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 984e9ce80a2..ec5b2339ae6 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -243,7 +243,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands',
'$BUILD_DIR/mongo/db/curop_failpoint_helpers',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
- '$BUILD_DIR/mongo/db/pipeline/mongod_process_interface',
+ '$BUILD_DIR/mongo/db/pipeline/mongo_process_interface',
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/rw_concern_d',
'$BUILD_DIR/mongo/db/stats/counters',
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 7970dcac23e..ff5e0268dd6 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -49,7 +49,7 @@
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
-#include "mongo/db/pipeline/mongod_process_interface.h"
+#include "mongo/db/pipeline/mongo_process_interface.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
@@ -328,7 +328,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
new ExpressionContext(opCtx,
request,
std::move(collator),
- MongoDInterface::create(opCtx),
+ MongoProcessInterface::create(opCtx),
uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)),
uuid);
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index d32e34b061e..8da8342dbbe 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -264,6 +264,16 @@ env.Library(
)
env.Library(
+ target='mongo_process_interface',
+ source=[
+ 'mongo_process_interface.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ ]
+)
+
+env.Library(
target='mongo_process_common',
source=[
'mongo_process_common.cpp',
@@ -276,15 +286,13 @@ env.Library(
)
env.Library(
- target='mongod_process_interface',
+ target='process_interface_standalone',
source=[
- 'mongod_process_interface.cpp',
+ 'process_interface_standalone.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/query_exec',
- '$BUILD_DIR/mongo/db/stats/top',
- '$BUILD_DIR/mongo/s/sharding_api',
'mongo_process_common',
],
LIBDEPS_PRIVATE=[
@@ -293,6 +301,17 @@ env.Library(
)
env.Library(
+ target='process_interface_shardsvr',
+ source=[
+ 'process_interface_shardsvr.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/s/sharding_api',
+ 'process_interface_standalone',
+ ],
+)
+
+env.Library(
target='mongos_process_interface',
source=[
'mongos_process_interface.cpp',
@@ -304,6 +323,16 @@ env.Library(
]
)
+env.Library(
+ target="process_interface_factory_mongod",
+ source=[
+ "process_interface_factory_mongod.cpp",
+ ],
+ LIBDEPS_PRIVATE=[
+ 'process_interface_shardsvr',
+ ],
+)
+
pipelineeEnv = env.Clone()
pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy'])
pipelineeEnv.Library(
diff --git a/src/mongo/db/pipeline/mongo_process_interface.cpp b/src/mongo/db/pipeline/mongo_process_interface.cpp
new file mode 100644
index 00000000000..547c1760148
--- /dev/null
+++ b/src/mongo/db/pipeline/mongo_process_interface.cpp
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo_process_interface.h"
+
+namespace mongo {
+
+MONGO_DEFINE_SHIM(MongoProcessInterface::create);
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 974fd2807d4..664c956c33a 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -35,6 +35,7 @@
#include <string>
#include <vector>
+#include "mongo/base/shim.h"
#include "mongo/client/dbclient_base.h"
#include "mongo/db/collection_index_usage_tracker.h"
#include "mongo/db/generic_cursor.h"
@@ -68,6 +69,13 @@ public:
enum class CurrentOpSessionsMode { kIncludeIdle, kExcludeIdle };
enum class CurrentOpCursorMode { kIncludeCursors, kExcludeCursors };
+ /**
+ * Factory function to create MongoProcessInterface of the right type. The implementation will
+ * be installed by a lib higher up in the link graph depending on the application type.
+ */
+ static MONGO_DECLARE_SHIM(
+ (OperationContext * opCtx)->std::shared_ptr<MongoProcessInterface>) create;
+
struct MakePipelineOptions {
MakePipelineOptions(){};
diff --git a/src/mongo/db/pipeline/process_interface_factory_mongod.cpp b/src/mongo/db/pipeline/process_interface_factory_mongod.cpp
new file mode 100644
index 00000000000..04e93b15d4d
--- /dev/null
+++ b/src/mongo/db/pipeline/process_interface_factory_mongod.cpp
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/process_interface_shardsvr.h"
+
+#include "mongo/db/s/sharding_state.h"
+
+namespace mongo {
+
+MONGO_REGISTER_SHIM(MongoProcessInterface::create)
+(OperationContext* opCtx)->std::shared_ptr<MongoProcessInterface> {
+ return ShardingState::get(opCtx)->enabled() ? std::make_shared<MongoInterfaceShardServer>(opCtx)
+ : std::make_shared<MongoInterfaceStandalone>(opCtx);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
new file mode 100644
index 00000000000..e86b7202d2b
--- /dev/null
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -0,0 +1,245 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/process_interface_shardsvr.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/catalog/uuid_catalog.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/pipeline/document_source_cursor.h"
+#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/session_catalog.h"
+#include "mongo/db/stats/fill_locker_info.h"
+#include "mongo/db/stats/storage_stats.h"
+#include "mongo/db/storage/backup_cursor_hooks.h"
+#include "mongo/db/transaction_participant.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/write_ops/cluster_write.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using write_ops::Insert;
+using write_ops::Update;
+using write_ops::UpdateOpEntry;
+
+namespace {
+
+/**
+ * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'.
+ */
+Insert buildInsertOp(const NamespaceString& nss,
+ std::vector<BSONObj>&& objs,
+ bool bypassDocValidation) {
+ Insert insertOp(nss);
+ insertOp.setDocuments(std::move(objs));
+ insertOp.setWriteCommandBase([&] {
+ write_ops::WriteCommandBase wcb;
+ wcb.setOrdered(true);
+ wcb.setBypassDocumentValidation(bypassDocValidation);
+ return wcb;
+ }());
+ return insertOp;
+}
+
+/**
+ * Builds an ordered update op on namespace 'nss' with update entries {q: <queries>, u: <updates>}.
+ *
+ * Note that 'queries' and 'updates' must be the same length.
+ */
+Update buildUpdateOp(const NamespaceString& nss,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
+ bool upsert,
+ bool multi,
+ bool bypassDocValidation) {
+ Update updateOp(nss);
+ updateOp.setUpdates([&] {
+ std::vector<UpdateOpEntry> updateEntries;
+ for (size_t index = 0; index < queries.size(); ++index) {
+ updateEntries.push_back([&] {
+ UpdateOpEntry entry;
+ entry.setQ(std::move(queries[index]));
+ entry.setU(std::move(updates[index]));
+ entry.setUpsert(upsert);
+ entry.setMulti(multi);
+ return entry;
+ }());
+ }
+ return updateEntries;
+ }());
+ updateOp.setWriteCommandBase([&] {
+ write_ops::WriteCommandBase wcb;
+ wcb.setOrdered(true);
+ wcb.setBypassDocumentValidation(bypassDocValidation);
+ return wcb;
+ }());
+ return updateOp;
+}
+
+// Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each
+// of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of
+// index.
+bool keyPatternNamesExactPaths(const BSONObj& keyPattern,
+ const std::set<FieldPath>& uniqueKeyPaths) {
+ size_t nFieldsMatched = 0;
+ for (auto&& elem : keyPattern) {
+ if (!elem.isNumber()) {
+ return false;
+ }
+ if (uniqueKeyPaths.find(elem.fieldNameStringData()) == uniqueKeyPaths.end()) {
+ return false;
+ }
+ ++nFieldsMatched;
+ }
+ return nFieldsMatched == uniqueKeyPaths.size();
+}
+
+bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const IndexCatalogEntry* index,
+ const std::set<FieldPath>& uniqueKeyPaths) {
+ return (index->descriptor()->unique() && !index->descriptor()->isPartial() &&
+ keyPatternNamesExactPaths(index->descriptor()->keyPattern(), uniqueKeyPaths) &&
+ CollatorInterface::collatorsMatch(index->getCollator(), expCtx->getCollator()));
+}
+
+} // namespace
+
+std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocumentKeyFields(
+ OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const {
+ invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
+
+ boost::optional<UUID> uuid;
+ NamespaceString nss;
+ if (nssOrUUID.uuid()) {
+ uuid = *(nssOrUUID.uuid());
+ nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(*uuid);
+ // An empty namespace indicates that the collection has been dropped. Treat it as unsharded
+ // and mark the fields as final.
+ if (nss.isEmpty()) {
+ return {{"_id"}, true};
+ }
+ } else if (nssOrUUID.nss()) {
+ nss = *(nssOrUUID.nss());
+ }
+
+ // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache
+ // to determine whether the collection is sharded in the first place.
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+
+ const bool collectionIsSharded = catalogCache && [&]() {
+ auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss);
+ return routingInfo.isOK() && routingInfo.getValue().cm();
+ }();
+
+ // Collection exists and is not sharded, mark as not final.
+ if (!collectionIsSharded) {
+ return {{"_id"}, false};
+ }
+
+ auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS);
+ return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx);
+ }();
+
+ // If the UUID is set in 'nssOrUuid', check that the UUID in the ScopedCollectionMetadata
+ // matches. Otherwise, this implies that the collection has been dropped and recreated as
+ // sharded.
+ if (!scm->isSharded() || (uuid && !scm->uuidMatches(*uuid))) {
+ return {{"_id"}, false};
+ }
+
+ // Unpack the shard key.
+ std::vector<FieldPath> result;
+ bool gotId = false;
+ for (auto& field : scm->getKeyPatternFields()) {
+ result.emplace_back(field->dottedField());
+ gotId |= (result.back().fullPath() == "_id");
+ }
+ if (!gotId) { // If not part of the shard key, "_id" comes last.
+ result.emplace_back("_id");
+ }
+ // Collection is now sharded so the document key fields will never change, mark as final.
+ return {result, true};
+}
+
+void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& objs) {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+
+ ClusterWriter::write(
+ expCtx->opCtx,
+ BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)),
+ &stats,
+ &response);
+ // TODO SERVER-35403: Add more context for which shard produced the error.
+ uassertStatusOKWithContext(response.toStatus(), "Insert failed: ");
+}
+
+void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
+ bool upsert,
+ bool multi) {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ ClusterWriter::write(expCtx->opCtx,
+ BatchedCommandRequest(buildUpdateOp(ns,
+ std::move(queries),
+ std::move(updates),
+ upsert,
+ multi,
+ expCtx->bypassDocumentValidation)),
+ &stats,
+ &response);
+ // TODO SERVER-35403: Add more context for which shard produced the error.
+ uassertStatusOKWithContext(response.toStatus(), "Update failed: ");
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
new file mode 100644
index 00000000000..4685e27735d
--- /dev/null
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/process_interface_standalone.h"
+
+namespace mongo {
+
+/**
+ * Specialized version of the MongoDInterface when this node is a shard server.
+ */
+class MongoInterfaceShardServer final : public MongoInterfaceStandalone {
+public:
+ using MongoInterfaceStandalone::MongoInterfaceStandalone;
+
+ std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(
+ OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const final;
+
+ /**
+ * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking,
+ * routing, stale config handling, etc.
+ */
+ void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& objs) final;
+
+ /**
+ * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking,
+ * routing, stale config handling, etc.
+ */
+ void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
+ bool upsert,
+ bool multi) final;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 0141726f22e..cb1a2a9a803 100644
--- a/src/mongo/db/pipeline/mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/pipeline/mongod_process_interface.h"
+#include "mongo/db/pipeline/process_interface_standalone.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
@@ -148,32 +148,25 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
-// static
-std::shared_ptr<MongoProcessInterface> MongoDInterface::create(OperationContext* opCtx) {
- return ShardingState::get(opCtx)->enabled()
- ? std::make_shared<MongoDInterfaceShardServer>(opCtx)
- : std::make_shared<MongoDInterface>(opCtx);
-}
-
-MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {}
+MongoInterfaceStandalone::MongoInterfaceStandalone(OperationContext* opCtx) : _client(opCtx) {}
-void MongoDInterface::setOperationContext(OperationContext* opCtx) {
+void MongoInterfaceStandalone::setOperationContext(OperationContext* opCtx) {
_client.setOpCtx(opCtx);
}
-DBClientBase* MongoDInterface::directClient() {
+DBClientBase* MongoInterfaceStandalone::directClient() {
return &_client;
}
-bool MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
+bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
AutoGetCollectionForReadCommand autoColl(opCtx, nss);
auto const css = CollectionShardingState::get(opCtx, nss);
return css->getMetadata(opCtx)->isSharded();
}
-void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs) {
+void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& objs) {
auto writeResults = performInserts(
expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
@@ -182,12 +175,12 @@ void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expC
uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Insert failed: ");
}
-void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
- bool upsert,
- bool multi) {
+void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
+ bool upsert,
+ bool multi) {
auto writeResults = performUpdates(expCtx->opCtx,
buildUpdateOp(ns,
std::move(queries),
@@ -201,8 +194,8 @@ void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expC
uassertStatusOKWithContext(writeResults.results.back().getStatus(), "Update failed: ");
}
-CollectionIndexUsageMap MongoDInterface::getIndexStats(OperationContext* opCtx,
- const NamespaceString& ns) {
+CollectionIndexUsageMap MongoInterfaceStandalone::getIndexStats(OperationContext* opCtx,
+ const NamespaceString& ns) {
AutoGetCollectionForReadCommand autoColl(opCtx, ns);
Collection* collection = autoColl.getCollection();
@@ -214,32 +207,32 @@ CollectionIndexUsageMap MongoDInterface::getIndexStats(OperationContext* opCtx,
return collection->infoCache()->getIndexUsageStats();
}
-void MongoDInterface::appendLatencyStats(OperationContext* opCtx,
- const NamespaceString& nss,
- bool includeHistograms,
- BSONObjBuilder* builder) const {
+void MongoInterfaceStandalone::appendLatencyStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const {
Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder);
}
-Status MongoDInterface::appendStorageStats(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& param,
- BSONObjBuilder* builder) const {
+Status MongoInterfaceStandalone::appendStorageStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const {
return appendCollectionStorageStats(opCtx, nss, param, builder);
}
-Status MongoDInterface::appendRecordCount(OperationContext* opCtx,
- const NamespaceString& nss,
- BSONObjBuilder* builder) const {
+Status MongoInterfaceStandalone::appendRecordCount(OperationContext* opCtx,
+ const NamespaceString& nss,
+ BSONObjBuilder* builder) const {
return appendCollectionRecordCount(opCtx, nss, builder);
}
-BSONObj MongoDInterface::getCollectionOptions(const NamespaceString& nss) {
+BSONObj MongoInterfaceStandalone::getCollectionOptions(const NamespaceString& nss) {
const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned();
}
-void MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged(
+void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged(
OperationContext* opCtx,
const BSONObj& renameCommandObj,
const NamespaceString& targetNs,
@@ -272,7 +265,7 @@ void MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged(
_client.runCommand("admin", renameCommandObj, info));
}
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoDInterface::makePipeline(
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoInterfaceStandalone::makePipeline(
const std::vector<BSONObj>& rawPipeline,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts) {
@@ -294,7 +287,7 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoDInterface::makePipe
return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
}
-Status MongoDInterface::attachCursorSourceToPipeline(
+Status MongoInterfaceStandalone::attachCursorSourceToPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
invariant(pipeline->getSources().empty() ||
!dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
@@ -331,7 +324,7 @@ Status MongoDInterface::attachCursorSourceToPipeline(
return Status::OK();
}
-std::string MongoDInterface::getShardName(OperationContext* opCtx) const {
+std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) const {
if (ShardingState::get(opCtx)->enabled()) {
return ShardingState::get(opCtx)->shardId().toString();
}
@@ -339,71 +332,17 @@ std::string MongoDInterface::getShardName(OperationContext* opCtx) const {
return std::string();
}
-std::pair<std::vector<FieldPath>, bool> MongoDInterface::collectDocumentKeyFields(
+std::pair<std::vector<FieldPath>, bool> MongoInterfaceStandalone::collectDocumentKeyFields(
OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const {
- if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
- return {{"_id"}, false}; // Nothing is sharded.
- }
- boost::optional<UUID> uuid;
- NamespaceString nss;
- if (nssOrUUID.uuid()) {
- uuid = *(nssOrUUID.uuid());
- nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(*uuid);
- // An empty namespace indicates that the collection has been dropped. Treat it as unsharded
- // and mark the fields as final.
- if (nss.isEmpty()) {
- return {{"_id"}, true};
- }
- } else if (nssOrUUID.nss()) {
- nss = *(nssOrUUID.nss());
- }
-
- // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache
- // to determine whether the collection is sharded in the first place.
- auto catalogCache = Grid::get(opCtx)->catalogCache();
-
- const bool collectionIsSharded = catalogCache && [&]() {
- auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss);
- return routingInfo.isOK() && routingInfo.getValue().cm();
- }();
-
- // Collection exists and is not sharded, mark as not final.
- if (!collectionIsSharded) {
- return {{"_id"}, false};
- }
-
- auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata {
- AutoGetCollection autoColl(opCtx, nss, MODE_IS);
- return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx);
- }();
-
- // If the UUID is set in 'nssOrUuid', check that the UUID in the ScopedCollectionMetadata
- // matches. Otherwise, this implies that the collection has been dropped and recreated as
- // sharded.
- if (!scm->isSharded() || (uuid && !scm->uuidMatches(*uuid))) {
- return {{"_id"}, false};
- }
-
- // Unpack the shard key.
- std::vector<FieldPath> result;
- bool gotId = false;
- for (auto& field : scm->getKeyPatternFields()) {
- result.emplace_back(field->dottedField());
- gotId |= (result.back().fullPath() == "_id");
- }
- if (!gotId) { // If not part of the shard key, "_id" comes last.
- result.emplace_back("_id");
- }
- // Collection is now sharded so the document key fields will never change, mark as final.
- return {result, true};
+ return {{"_id"}, false}; // Nothing is sharded.
}
-std::vector<GenericCursor> MongoDInterface::getIdleCursors(
+std::vector<GenericCursor> MongoInterfaceStandalone::getIdleCursors(
const intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const {
return CursorManager::getIdleCursors(expCtx->opCtx, userMode);
}
-boost::optional<Document> MongoDInterface::lookupSingleDocument(
+boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
UUID collectionUUID,
@@ -438,7 +377,7 @@ boost::optional<Document> MongoDInterface::lookupSingleDocument(
return lookedUpDocument;
}
-BackupCursorState MongoDInterface::openBackupCursor(OperationContext* opCtx) {
+BackupCursorState MongoInterfaceStandalone::openBackupCursor(OperationContext* opCtx) {
auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext());
if (backupCursorHooks->enabled()) {
return backupCursorHooks->openBackupCursor(opCtx);
@@ -447,7 +386,7 @@ BackupCursorState MongoDInterface::openBackupCursor(OperationContext* opCtx) {
}
}
-void MongoDInterface::closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) {
+void MongoInterfaceStandalone::closeBackupCursor(OperationContext* opCtx, std::uint64_t cursorId) {
auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext());
if (backupCursorHooks->enabled()) {
backupCursorHooks->closeBackupCursor(opCtx, cursorId);
@@ -456,7 +395,7 @@ void MongoDInterface::closeBackupCursor(OperationContext* opCtx, std::uint64_t c
}
}
-std::vector<BSONObj> MongoDInterface::getMatchingPlanCacheEntryStats(
+std::vector<BSONObj> MongoInterfaceStandalone::getMatchingPlanCacheEntryStats(
OperationContext* opCtx, const NamespaceString& nss, const MatchExpression* matchExp) const {
const auto serializer = [](const PlanCacheEntry& entry) {
BSONObjBuilder out;
@@ -481,7 +420,7 @@ std::vector<BSONObj> MongoDInterface::getMatchingPlanCacheEntryStats(
return planCache->getMatchingStats(serializer, predicate);
}
-bool MongoDInterface::uniqueKeyIsSupportedByIndex(
+bool MongoInterfaceStandalone::uniqueKeyIsSupportedByIndex(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
const std::set<FieldPath>& uniqueKeyPaths) const {
@@ -509,9 +448,8 @@ bool MongoDInterface::uniqueKeyIsSupportedByIndex(
return false;
}
-BSONObj MongoDInterface::_reportCurrentOpForClient(OperationContext* opCtx,
- Client* client,
- CurrentOpTruncateMode truncateOps) const {
+BSONObj MongoInterfaceStandalone::_reportCurrentOpForClient(
+ OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const {
BSONObjBuilder builder;
CurOp::reportCurrentOpForClient(
@@ -534,9 +472,9 @@ BSONObj MongoDInterface::_reportCurrentOpForClient(OperationContext* opCtx,
return builder.obj();
}
-void MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx,
- CurrentOpUserMode userMode,
- std::vector<BSONObj>* ops) const {
+void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext* opCtx,
+ CurrentOpUserMode userMode,
+ std::vector<BSONObj>* ops) const {
auto sessionCatalog = SessionCatalog::get(opCtx);
const bool authEnabled =
@@ -562,7 +500,7 @@ void MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx,
});
}
-std::unique_ptr<CollatorInterface> MongoDInterface::_getCollectionDefaultCollator(
+std::unique_ptr<CollatorInterface> MongoInterfaceStandalone::_getCollectionDefaultCollator(
OperationContext* opCtx, StringData dbName, UUID collectionUUID) {
auto it = _collatorCache.find(collectionUUID);
if (it == _collatorCache.end()) {
@@ -586,40 +524,4 @@ std::unique_ptr<CollatorInterface> MongoDInterface::_getCollectionDefaultCollato
return collator ? collator->clone() : nullptr;
}
-void MongoDInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs) {
- BatchedCommandResponse response;
- BatchWriteExecStats stats;
-
- ClusterWriter::write(
- expCtx->opCtx,
- BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)),
- &stats,
- &response);
- // TODO SERVER-35403: Add more context for which shard produced the error.
- uassertStatusOKWithContext(response.toStatus(), "Insert failed: ");
-}
-
-void MongoDInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
- bool upsert,
- bool multi) {
- BatchedCommandResponse response;
- BatchWriteExecStats stats;
- ClusterWriter::write(expCtx->opCtx,
- BatchedCommandRequest(buildUpdateOp(ns,
- std::move(queries),
- std::move(updates),
- upsert,
- multi,
- expCtx->bypassDocumentValidation)),
- &stats,
- &response);
- // TODO SERVER-35403: Add more context for which shard produced the error.
- uassertStatusOKWithContext(response.toStatus(), "Update failed: ");
-}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/process_interface_standalone.h
index cb9614f2172..981ca2b35c3 100644
--- a/src/mongo/db/pipeline/mongod_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -38,26 +38,26 @@ namespace mongo {
* Class to provide access to mongod-specific implementations of methods required by some
* document sources.
*/
-class MongoDInterface : public MongoProcessCommon {
+class MongoInterfaceStandalone : public MongoProcessCommon {
public:
- static std::shared_ptr<MongoProcessInterface> create(OperationContext* opCtx);
+ // static std::shared_ptr<MongoProcessInterface> create(OperationContext* opCtx);
- MongoDInterface(OperationContext* opCtx);
+ MongoInterfaceStandalone(OperationContext* opCtx);
- virtual ~MongoDInterface() = default;
+ virtual ~MongoInterfaceStandalone() = default;
void setOperationContext(OperationContext* opCtx) final;
DBClientBase* directClient() final;
bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
- virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs);
- virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
- bool upsert,
- bool multi);
+ void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& objs) override;
+ void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ std::vector<BSONObj>&& queries,
+ std::vector<BSONObj>&& updates,
+ bool upsert,
+ bool multi) override;
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final;
void appendLatencyStats(OperationContext* opCtx,
const NamespaceString& nss,
@@ -84,7 +84,7 @@ public:
Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final;
std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(
- OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const final;
+ OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const override;
boost::optional<Document> lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
@@ -129,31 +129,4 @@ private:
std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache;
};
-/**
- * Specialized version of the MongoDInterface when this node is a shard server.
- */
-class MongoDInterfaceShardServer final : public MongoDInterface {
-public:
- using MongoDInterface::MongoDInterface;
-
- /**
- * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking,
- * routing, stale config handling, etc.
- */
- void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& objs) final;
-
- /**
- * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking,
- * routing, stale config handling, etc.
- */
- void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- std::vector<BSONObj>&& queries,
- std::vector<BSONObj>&& updates,
- bool upsert,
- bool multi) final;
-};
-
} // namespace mongo
diff --git a/src/mongo/embedded/SConscript b/src/mongo/embedded/SConscript
index 26bd7f7f80a..5b54fadee8d 100644
--- a/src/mongo/embedded/SConscript
+++ b/src/mongo/embedded/SConscript
@@ -66,6 +66,7 @@ env.Library(
'embedded_options_parser_init.cpp',
'logical_session_cache_factory_embedded.cpp',
'periodic_runner_embedded.cpp',
+ 'process_interface_factory_embedded.cpp',
'replication_coordinator_embedded.cpp',
'service_entry_point_embedded.cpp',
],
@@ -84,6 +85,7 @@ env.Library(
'$BUILD_DIR/mongo/db/logical_session_cache',
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/op_observer_impl',
+ '$BUILD_DIR/mongo/db/pipeline/process_interface_standalone',
'$BUILD_DIR/mongo/db/repair_database_and_check_version',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/repl/replica_set_messages',
diff --git a/src/mongo/embedded/process_interface_factory_embedded.cpp b/src/mongo/embedded/process_interface_factory_embedded.cpp
new file mode 100644
index 00000000000..1a59855ccfe
--- /dev/null
+++ b/src/mongo/embedded/process_interface_factory_embedded.cpp
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/process_interface_standalone.h"
+
+namespace mongo {
+
+MONGO_REGISTER_SHIM(MongoProcessInterface::create)
+(OperationContext* opCtx)->std::shared_ptr<MongoProcessInterface> {
+ return std::make_shared<MongoInterfaceStandalone>(opCtx);
+}
+
+} // namespace mongo