diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2020-01-29 09:01:54 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-05 21:21:37 +0000 |
commit | 8d6496fe97e9b8f5c709778cd975071ad0a69e1c (patch) | |
tree | 6d517e123d0c2d0983b31bf86fc79ec12ae0aff0 /src/mongo/db/pipeline | |
parent | 969151e9ab69dcb53397cf40f810e718421db081 (diff) | |
download | mongo-8d6496fe97e9b8f5c709778cd975071ad0a69e1c.tar.gz |
SERVER-45838 Rename/restructure mongo_process_interface classes and files
create mode 100644 src/mongo/db/pipeline/process_interface/SConscript
rename src/mongo/db/pipeline/{process_interface_standalone.cpp => process_interface/common_mongod_process_interface.cpp} (72%)
rename src/mongo/db/pipeline/{process_interface_standalone.h => process_interface/common_mongod_process_interface.h} (69%)
rename src/mongo/db/pipeline/{mongo_process_common.cpp => process_interface/common_process_interface.cpp} (91%)
rename src/mongo/db/pipeline/{mongo_process_common.h => process_interface/common_process_interface.h} (91%)
rename src/mongo/db/pipeline/{ => process_interface}/mongo_process_interface.cpp (96%)
rename src/mongo/db/pipeline/{ => process_interface}/mongo_process_interface.h (100%)
rename src/mongo/db/pipeline/{process_interface_factory_mongod.cpp => process_interface/mongod_process_interface_factory.cpp} (84%)
rename src/mongo/db/pipeline/{ => process_interface}/mongos_process_interface.cpp (93%)
rename src/mongo/db/pipeline/{ => process_interface}/mongos_process_interface.h (97%)
rename src/mongo/db/pipeline/{ => process_interface}/mongos_process_interface_test.cpp (84%)
create mode 100644 src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
create mode 100644 src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
rename src/mongo/db/pipeline/{process_interface_shardsvr.cpp => process_interface/shardsvr_process_interface.cpp} (88%)
rename src/mongo/db/pipeline/{process_interface_shardsvr.h => process_interface/shardsvr_process_interface.h} (95%)
rename src/mongo/db/pipeline/{process_interface_standalone_test.cpp => process_interface/shardsvr_process_interface_test.cpp} (95%)
rename src/mongo/db/pipeline/{stub_mongo_process_interface_lookup_single_document.cpp => process_interface/stub_lookup_single_document_process_interface.cpp} (89%)
rename src/mongo/db/pipeline/{stub_mongo_process_interface_lookup_single_document.h => process_interface/stub_lookup_single_document_process_interface.h} (93%)
rename src/mongo/db/pipeline/{ => process_interface}/stub_mongo_process_interface.h (99%)
Diffstat (limited to 'src/mongo/db/pipeline')
35 files changed, 631 insertions, 516 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 004469afc0e..2905a7d808b 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -4,6 +4,15 @@ Import('env') env = env.Clone() +env.SConscript( + dirs=[ + 'process_interface', + ], + exports=[ + 'env', + ], +) + env.Library( target='aggregation', source=[ @@ -145,7 +154,7 @@ env.Library( target='document_source_mock', source=[ 'document_source_mock.cpp', - 'stub_mongo_process_interface_lookup_single_document.cpp', + 'process_interface/stub_lookup_single_document_process_interface.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/query/query_test_service_context', @@ -178,93 +187,6 @@ env.Library( ) env.Library( - target='mongo_process_interface', - source=[ - 'mongo_process_interface.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/base', - ] -) - -env.Library( - target='mongo_process_common', - source=[ - 'mongo_process_common.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/auth/auth', - '$BUILD_DIR/mongo/db/generic_cursor', - '$BUILD_DIR/mongo/s/sharding_router_api', - 'field_path', - ] -) - -env.Library( - target='process_interface_standalone', - source=[ - 'process_interface_standalone.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/catalog/catalog_helpers', - '$BUILD_DIR/mongo/db/ops/write_ops_exec', - '$BUILD_DIR/mongo/db/query_exec', - '$BUILD_DIR/mongo/db/repl/speculative_majority_read_info', - 'mongo_process_common', - ], - LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/db/catalog/database_holder', - '$BUILD_DIR/mongo/db/concurrency/flow_control_ticketholder', - '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', - '$BUILD_DIR/mongo/db/session_catalog', - '$BUILD_DIR/mongo/db/storage/backup_cursor_hooks', - '$BUILD_DIR/mongo/db/transaction', - '$BUILD_DIR/mongo/scripting/scripting_common', - ], -) - -env.Library( - target='process_interface_shardsvr', - source=[ - 'process_interface_shardsvr.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/transaction', - '$BUILD_DIR/mongo/db/write_ops', - '$BUILD_DIR/mongo/s/sharding_api', - 'process_interface_standalone', - 'sharded_agg_helpers', - ], -) - -env.Library( - target='mongos_process_interface', - source=[ - 'mongos_process_interface.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/pipeline/pipeline', - '$BUILD_DIR/mongo/s/query/async_results_merger', - '$BUILD_DIR/mongo/s/query/cluster_query', - 'mongo_process_common', - 'sharded_agg_helpers', - ], - LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/db/session_catalog', - ], -) - -env.Library( - target="process_interface_factory_mongod", - source=[ - "process_interface_factory_mongod.cpp", - ], - LIBDEPS_PRIVATE=[ - 'process_interface_shardsvr', - ], -) - -env.Library( target="document_path_support", source=[ 'document_path_support.cpp', @@ -469,10 +391,8 @@ env.CppUnitTest( 'granularity_rounder_powers_of_two_test.cpp', 'granularity_rounder_preferred_numbers_test.cpp', 'lookup_set_cache_test.cpp', - 'mongos_process_interface_test.cpp', 'pipeline_metadata_tree_test.cpp', 'pipeline_test.cpp', - 'process_interface_standalone_test.cpp', 'resume_token_test.cpp', 'semantic_analysis_test.cpp', 'sequential_document_cache_test.cpp', @@ -503,11 +423,9 @@ env.CppUnitTest( 'expression', 'field_path', 'granularity_rounder', - 'mongo_process_common', - 'mongo_process_interface', - 'mongos_process_interface', 'pipeline', - 'process_interface_shardsvr', - 'process_interface_standalone', + 'process_interface/mongod_process_interfaces', + 'process_interface/mongos_process_interface', + 'sharded_agg_helpers', ] ) diff --git a/src/mongo/db/pipeline/accumulator_js_test.cpp b/src/mongo/db/pipeline/accumulator_js_test.cpp index 41f12ac8898..6c5f2b6127b 100644 --- a/src/mongo/db/pipeline/accumulator_js_test.cpp +++ b/src/mongo/db/pipeline/accumulator_js_test.cpp @@ -35,7 +35,7 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/accumulator_js_reduce.h" #include "mongo/db/pipeline/expression_context_for_test.h" -#include "mongo/db/pipeline/process_interface_standalone.h" +#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/dbtests/dbtests.h" #include "mongo/scripting/engine.h" @@ -46,7 +46,8 @@ namespace { class MapReduceFixture : public ServiceContextMongoDTest { protected: MapReduceFixture() : _expCtx((new ExpressionContextForTest())) { - _expCtx->mongoProcessInterface = std::make_shared<MongoInterfaceStandalone>(_expCtx->opCtx); + _expCtx->mongoProcessInterface = + std::make_shared<NonShardServerProcessInterface>(_expCtx->opCtx); } boost::intrusive_ptr<ExpressionContextForTest>& getExpCtx() { diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 1e37a101baf..de0e757ab69 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -50,7 +50,7 @@ #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_source_sort.h" -#include "mongo/db/pipeline/stub_mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/transaction_history_iterator.h" diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 38f8ad2b2f3..044afa7b5a9 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -40,8 +40,8 @@ #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" #include "mongo/db/pipeline/resume_token.h" -#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/db/service_context.h" #include "mongo/unittest/death_test.h" diff --git a/src/mongo/db/pipeline/document_source_current_op_test.cpp b/src/mongo/db/pipeline/document_source_current_op_test.cpp index a2fb9ed833a..260bfbf0829 100644 --- a/src/mongo/db/pipeline/document_source_current_op_test.cpp +++ b/src/mongo/db/pipeline/document_source_current_op_test.cpp @@ -33,7 +33,7 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_current_op.h" -#include "mongo/db/pipeline/stub_mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index a408be7580e..a3996c47480 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -37,7 +37,7 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_graph_lookup.h" #include "mongo/db/pipeline/document_source_mock.h" -#include "mongo/db/pipeline/stub_mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 7f8d624848b..554d308d237 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -44,7 +44,7 @@ #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/field_path.h" -#include "mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h" +#include "mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h" namespace mongo { namespace { @@ -52,7 +52,7 @@ using boost::intrusive_ptr; using std::deque; using std::vector; -using MockMongoInterface = StubMongoProcessInterfaceLookupSingleDocument; +using MockMongoInterface = StubLookupSingleDocumentProcessInterface; // This provides access to getExpCtx(), but we'll use a different name for this test suite. class DocumentSourceLookupChangePostImageTest : public AggregationContextFixture { diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index a4993f706ca..ae0f329e14f 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -43,7 +43,7 @@ #include "mongo/db/pipeline/document_source_lookup.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/field_path.h" -#include "mongo/db/pipeline/stub_mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_mock.h" diff --git a/src/mongo/db/pipeline/document_source_merge_test.cpp b/src/mongo/db/pipeline/document_source_merge_test.cpp index f309915462f..f78f3195ec1 100644 --- a/src/mongo/db/pipeline/document_source_merge_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_test.cpp @@ -35,7 +35,7 @@ #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_merge.h" -#include "mongo/db/pipeline/process_interface_standalone.h" +#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" namespace mongo { namespace { diff --git a/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp b/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp index 47970d8c9e5..95f5c799b7c 100644 --- a/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp +++ b/src/mongo/db/pipeline/document_source_plan_cache_stats_test.cpp @@ -34,7 +34,7 @@ #include "mongo/bson/json.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_plan_cache_stats.h" -#include "mongo/db/pipeline/stub_mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" #include "mongo/unittest/unittest.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source_union_with_test.cpp b/src/mongo/db/pipeline/document_source_union_with_test.cpp index a8fd3784573..90b6b9fefa4 100644 --- a/src/mongo/db/pipeline/document_source_union_with_test.cpp +++ b/src/mongo/db/pipeline/document_source_union_with_test.cpp @@ -46,14 +46,14 @@ #include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/document_source_union_with.h" #include "mongo/db/pipeline/pipeline.h" -#include "mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h" +#include "mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h" #include "mongo/unittest/unittest.h" #include "mongo/util/intrusive_counter.h" namespace mongo { namespace { -using MockMongoInterface = StubMongoProcessInterfaceLookupSingleDocument; +using MockMongoInterface = StubLookupSingleDocumentProcessInterface; // This provides access to getExpCtx(), but we'll use a different name for this test suite. using DocumentSourceUnionWithTest = AggregationContextFixture; diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 1a9eb04e4ef..563b03569ab 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -32,7 +32,7 @@ #include <utility> #include "mongo/db/pipeline/expression_context.h" -#include "mongo/db/pipeline/stub_mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" #include "mongo/db/query/collation/collation_spec.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/util/intrusive_counter.h" diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 6fde76c2c07..b0f4df41e05 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -43,7 +43,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/javascript_execution.h" -#include "mongo/db/pipeline/mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/mongo_process_interface.h" #include "mongo/db/pipeline/runtime_constants_gen.h" #include "mongo/db/pipeline/variables.h" #include "mongo/db/query/collation/collator_interface.h" diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h index d716469f6e7..f5cbf88b799 100644 --- a/src/mongo/db/pipeline/expression_context_for_test.h +++ b/src/mongo/db/pipeline/expression_context_for_test.h @@ -32,7 +32,7 @@ #include <boost/optional.hpp> #include "mongo/db/pipeline/expression_context.h" -#include "mongo/db/pipeline/stub_mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" #include "mongo/db/query/datetime/date_time_support.h" #include "mongo/db/query/query_test_service_context.h" diff --git a/src/mongo/db/pipeline/expression_javascript_test.cpp b/src/mongo/db/pipeline/expression_javascript_test.cpp index e921d15fa65..43539f7bf75 100644 --- a/src/mongo/db/pipeline/expression_javascript_test.cpp +++ b/src/mongo/db/pipeline/expression_javascript_test.cpp @@ -33,7 +33,7 @@ #include "mongo/db/exec/document_value/document.h" #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/pipeline/expression_context_for_test.h" -#include "mongo/db/pipeline/process_interface_standalone.h" +#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/scripting/engine.h" @@ -46,7 +46,8 @@ class MapReduceFixture : public ServiceContextMongoDTest { protected: MapReduceFixture() : _expCtx((new ExpressionContextForTest())), _vps(_expCtx->variablesParseState) { - _expCtx->mongoProcessInterface = std::make_shared<MongoInterfaceStandalone>(_expCtx->opCtx); + _expCtx->mongoProcessInterface = + std::make_shared<NonShardServerProcessInterface>(_expCtx->opCtx); } boost::intrusive_ptr<ExpressionContextForTest>& getExpCtx() { diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index c94058843b2..a657912b6a7 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -54,9 +54,9 @@ #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" #include "mongo/db/pipeline/semantic_analysis.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" -#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/db/query/query_test_service_context.h" #include "mongo/db/repl/replication_coordinator_mock.h" diff --git a/src/mongo/db/pipeline/process_interface/SConscript b/src/mongo/db/pipeline/process_interface/SConscript new file mode 100644 index 00000000000..0a19c65c996 --- /dev/null +++ b/src/mongo/db/pipeline/process_interface/SConscript @@ -0,0 +1,111 @@ +# -*- mode: python -*- + +Import('env') + +env = env.Clone() + +env.Library( + target='mongo_process_interface', + source=[ + 'mongo_process_interface.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], +) + +env.Library( + target='common_process_interface', + source=[ + 'common_process_interface.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/auth/auth', + '$BUILD_DIR/mongo/db/generic_cursor', + '$BUILD_DIR/mongo/db/pipeline/field_path', + '$BUILD_DIR/mongo/s/sharding_router_api', + ], +) + +# This library is the basic mongod functionality, depended on by embedded so designed to exclude +# large components like sharding and networking. +env.Library( + target='mongod_process_interfaces', + source=[ + 'common_mongod_process_interface.cpp', + 'non_shardsvr_process_interface.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/ops/write_ops_exec', + '$BUILD_DIR/mongo/db/pipeline/pipeline', + '$BUILD_DIR/mongo/db/transaction', + '$BUILD_DIR/mongo/db/write_ops', + 'common_process_interface', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/catalog_helpers', + '$BUILD_DIR/mongo/db/catalog/database_holder', + '$BUILD_DIR/mongo/db/concurrency/flow_control_ticketholder', + '$BUILD_DIR/mongo/db/index_builds_coordinator_mongod', + '$BUILD_DIR/mongo/db/session_catalog', + '$BUILD_DIR/mongo/db/storage/backup_cursor_hooks', + '$BUILD_DIR/mongo/db/transaction', + '$BUILD_DIR/mongo/scripting/scripting_common', + ], +) + +env.Library( + target="shardsvr_process_interface", + source=[ + 'shardsvr_process_interface.cpp', + ], + LIBDEPS=[ + 'mongod_process_interfaces', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers', + '$BUILD_DIR/mongo/s/sharding_api', + ], +) + +env.Library( + target='mongod_process_interface_factory', + source=[ + 'mongod_process_interface_factory.cpp', + ], + LIBDEPS=[ + 'mongod_process_interfaces', + 'shardsvr_process_interface', + ], +) + +env.Library( + target='mongos_process_interface', + source=[ + 'mongos_process_interface.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/pipeline/pipeline', + '$BUILD_DIR/mongo/s/query/async_results_merger', + '$BUILD_DIR/mongo/s/query/cluster_query', + 'common_process_interface', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/session_catalog', + '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers', + ], +) + +env.CppUnitTest( + target='process_interface_test', + source=[ + 'mongos_process_interface_test.cpp', + 'shardsvr_process_interface_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/query/query_test_service_context', + '$BUILD_DIR/mongo/db/service_context_test_fixture', + 'mongos_process_interface', + 'shardsvr_process_interface', + ] +) diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index 57e41991379..483b20cd702 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -31,7 +31,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/process_interface_standalone.h" +#include "mongo/db/pipeline/process_interface/common_mongod_process_interface.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" @@ -49,7 +49,6 @@ #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" #include "mongo/db/index/index_descriptor.h" -#include "mongo/db/index_builds_coordinator.h" #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline_d.h" @@ -73,14 +72,6 @@ namespace mongo { -using boost::intrusive_ptr; -using std::shared_ptr; -using std::string; -using std::unique_ptr; -using write_ops::Insert; -using write_ops::Update; -using write_ops::UpdateOpEntry; - namespace { class MongoDResourceYielder : public ResourceYielder { @@ -152,117 +143,26 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace -MongoInterfaceStandalone::MongoInterfaceStandalone(OperationContext* opCtx) {} +CommonMongodProcessInterface::CommonMongodProcessInterface(OperationContext* opCtx) {} std::unique_ptr<TransactionHistoryIteratorBase> -MongoInterfaceStandalone::createTransactionHistoryIterator(repl::OpTime time) const { +CommonMongodProcessInterface::createTransactionHistoryIterator(repl::OpTime time) const { bool permitYield = true; return std::unique_ptr<TransactionHistoryIteratorBase>( new TransactionHistoryIterator(time, permitYield)); } -bool MongoInterfaceStandalone::isSharded(OperationContext* opCtx, const NamespaceString& nss) { +bool CommonMongodProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { Lock::DBLock dbLock(opCtx, nss.db(), MODE_IS); Lock::CollectionLock collLock(opCtx, nss, MODE_IS); const auto metadata = CollectionShardingState::get(opCtx, nss)->getCurrentMetadata(); return metadata->isSharded(); } -Insert MongoInterfaceStandalone::buildInsertOp(const NamespaceString& nss, - std::vector<BSONObj>&& objs, - bool bypassDocValidation) { - Insert insertOp(nss); - insertOp.setDocuments(std::move(objs)); - insertOp.setWriteCommandBase([&] { - write_ops::WriteCommandBase wcb; - wcb.setOrdered(false); - wcb.setBypassDocumentValidation(bypassDocValidation); - return wcb; - }()); - return insertOp; -} - -Update MongoInterfaceStandalone::buildUpdateOp( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - BatchedObjects&& batch, - UpsertType upsert, - bool multi) { - Update updateOp(nss); - updateOp.setUpdates([&] { - std::vector<UpdateOpEntry> updateEntries; - for (auto&& obj : batch) { - updateEntries.push_back([&] { - UpdateOpEntry entry; - auto&& [q, u, c] = obj; - entry.setQ(std::move(q)); - entry.setU(std::move(u)); - entry.setC(std::move(c)); - entry.setUpsert(upsert != UpsertType::kNone); - // TODO SERVER-44884: after branching for 4.5, remove the 'useNewUpsert' flag. - entry.setUpsertSupplied({{entry.getUpsert() && expCtx->useNewUpsert, - upsert == UpsertType::kInsertSuppliedDoc}}); - entry.setMulti(multi); - return entry; - }()); - } - return updateEntries; - }()); - updateOp.setWriteCommandBase([&] { - write_ops::WriteCommandBase wcb; - wcb.setOrdered(false); - wcb.setBypassDocumentValidation(expCtx->bypassDocumentValidation); - return wcb; - }()); - updateOp.setRuntimeConstants(expCtx->getRuntimeConstants()); - return updateOp; -} - -Status MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { - auto writeResults = performInserts( - expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); - - // Need to check each result in the batch since the writes are unordered. - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); - } - } - return Status::OK(); -} - -StatusWith<MongoProcessInterface::UpdateResult> MongoInterfaceStandalone::update( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - BatchedObjects&& batch, - const WriteConcernOptions& wc, - UpsertType upsert, - bool multi, - boost::optional<OID> targetEpoch) { - auto writeResults = - performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); - - // Need to check each result in the batch since the writes are unordered. - UpdateResult updateResult; - for (const auto& result : writeResults.results) { - if (result.getStatus() != Status::OK()) { - return result.getStatus(); - } - - updateResult.nMatched += result.getValue().getN(); - updateResult.nModified += result.getValue().getNModified(); - } - return updateResult; -} - -std::vector<Document> MongoInterfaceStandalone::getIndexStats(OperationContext* opCtx, - const NamespaceString& ns, - StringData host, - bool addShardName) { +std::vector<Document> CommonMongodProcessInterface::getIndexStats(OperationContext* opCtx, + const NamespaceString& ns, + StringData host, + bool addShardName) { AutoGetCollectionForReadCommand autoColl(opCtx, ns); Collection* collection = autoColl.getCollection(); @@ -306,35 +206,29 @@ std::vector<Document> MongoInterfaceStandalone::getIndexStats(OperationContext* return indexStats; } -std::list<BSONObj> MongoInterfaceStandalone::getIndexSpecs(OperationContext* opCtx, - const NamespaceString& ns, - bool includeBuildUUIDs) { - return listIndexesEmptyListIfMissing(opCtx, ns, includeBuildUUIDs); -} - -void MongoInterfaceStandalone::appendLatencyStats(OperationContext* opCtx, - const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const { +void CommonMongodProcessInterface::appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const { Top::get(opCtx->getServiceContext()).appendLatencyStats(nss, includeHistograms, builder); } -Status MongoInterfaceStandalone::appendStorageStats(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const { +Status CommonMongodProcessInterface::appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const { return appendCollectionStorageStats(opCtx, nss, param, builder); } -Status MongoInterfaceStandalone::appendRecordCount(OperationContext* opCtx, - const NamespaceString& nss, - BSONObjBuilder* builder) const { +Status CommonMongodProcessInterface::appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const { return appendCollectionRecordCount(opCtx, nss, builder); } -Status MongoInterfaceStandalone::appendQueryExecStats(OperationContext* opCtx, - const NamespaceString& nss, - BSONObjBuilder* builder) const { +Status CommonMongodProcessInterface::appendQueryExecStats(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const { AutoGetCollectionForReadCommand autoColl(opCtx, nss); if (!autoColl.getDb()) { @@ -365,8 +259,8 @@ Status MongoInterfaceStandalone::appendQueryExecStats(OperationContext* opCtx, return Status::OK(); } -BSONObj MongoInterfaceStandalone::getCollectionOptions(OperationContext* opCtx, - const NamespaceString& nss) { +BSONObj CommonMongodProcessInterface::getCollectionOptions(OperationContext* opCtx, + const NamespaceString& nss) { AutoGetCollectionForReadCommand autoColl(opCtx, nss); BSONObj collectionOptions = {}; if (!autoColl.getDb()) { @@ -383,67 +277,7 @@ BSONObj MongoInterfaceStandalone::getCollectionOptions(OperationContext* opCtx, return collectionOptions; } -void MongoInterfaceStandalone::renameIfOptionsAndIndexesHaveNotChanged( - OperationContext* opCtx, - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) { - NamespaceString sourceNs = NamespaceString(renameCommandObj["renameCollection"].String()); - doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx, - sourceNs, - targetNs, - renameCommandObj["dropTarget"].trueValue(), - renameCommandObj["stayTemp"].trueValue(), - originalIndexes, - originalCollectionOptions); -} - -void MongoInterfaceStandalone::createCollection(OperationContext* opCtx, - const std::string& dbName, - const BSONObj& cmdObj) { - uassertStatusOK(mongo::createCollection(opCtx, dbName, cmdObj)); -} - -void MongoInterfaceStandalone::createIndexesOnEmptyCollection( - OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) { - AutoGetCollection autoColl(opCtx, ns, MODE_X); - writeConflictRetry( - opCtx, "MongoInterfaceStandalone::createIndexesOnEmptyCollection", ns.ns(), [&] { - auto collection = autoColl.getCollection(); - invariant(collection, - str::stream() << "Failed to create indexes for aggregation because " - "collection does not exist: " - << ns << ": " << BSON("indexes" << indexSpecs)); - - invariant(0U == collection->numRecords(opCtx), - str::stream() << "Expected empty collection for index creation: " << ns - << ": numRecords: " << collection->numRecords(opCtx) << ": " - << BSON("indexes" << indexSpecs)); - - // Secondary index builds do not filter existing indexes so we have to do this on the - // primary. - auto removeIndexBuildsToo = false; - auto filteredIndexes = collection->getIndexCatalog()->removeExistingIndexes( - opCtx, indexSpecs, removeIndexBuildsToo); - if (filteredIndexes.empty()) { - return; - } - - WriteUnitOfWork wuow(opCtx); - IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection( - opCtx, collection->uuid(), filteredIndexes, false // fromMigrate - ); - wuow.commit(); - }); -} -void MongoInterfaceStandalone::dropCollection(OperationContext* opCtx, const NamespaceString& ns) { - BSONObjBuilder result; - uassertStatusOK(mongo::dropCollection( - opCtx, ns, result, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops)); -} - -std::unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::makePipeline( +std::unique_ptr<Pipeline, PipelineDeleter> CommonMongodProcessInterface::makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) { @@ -461,15 +295,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::makePipelin return pipeline; } -unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceStandalone::attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* ownedPipeline, - bool allowTargetingShards) { - return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline); -} - -unique_ptr<Pipeline, PipelineDeleter> -MongoInterfaceStandalone::attachCursorSourceToPipelineForLocalRead( +std::unique_ptr<Pipeline, PipelineDeleter> +CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) { std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, PipelineDeleter(expCtx->opCtx)); @@ -493,7 +320,7 @@ MongoInterfaceStandalone::attachCursorSourceToPipelineForLocalRead( return pipeline; } -std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) const { +std::string CommonMongodProcessInterface::getShardName(OperationContext* opCtx) const { if (ShardingState::get(opCtx)->enabled()) { return ShardingState::get(opCtx)->shardId().toString(); } @@ -501,24 +328,12 @@ std::string MongoInterfaceStandalone::getShardName(OperationContext* opCtx) cons return std::string(); } -std::pair<std::vector<FieldPath>, bool> -MongoInterfaceStandalone::collectDocumentKeyFieldsForHostedCollection(OperationContext* opCtx, - const NamespaceString& nss, - UUID uuid) const { - return {{"_id"}, false}; // Nothing is sharded. -} - -std::vector<FieldPath> MongoInterfaceStandalone::collectDocumentKeyFieldsActingAsRouter( - OperationContext* opCtx, const NamespaceString& nss) const { - return {"_id"}; // Nothing is sharded. -} - -std::vector<GenericCursor> MongoInterfaceStandalone::getIdleCursors( - const intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { +std::vector<GenericCursor> CommonMongodProcessInterface::getIdleCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { return CursorManager::get(expCtx->opCtx)->getIdleCursors(expCtx->opCtx, userMode); } -boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument( +boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, UUID collectionUUID, @@ -574,7 +389,7 @@ boost::optional<Document> MongoInterfaceStandalone::lookupSingleDocument( return lookedUpDocument; } -BackupCursorState MongoInterfaceStandalone::openBackupCursor( +BackupCursorState CommonMongodProcessInterface::openBackupCursor( OperationContext* opCtx, const StorageEngine::BackupOptions& options) { auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext()); if (backupCursorHooks->enabled()) { @@ -584,7 +399,8 @@ BackupCursorState MongoInterfaceStandalone::openBackupCursor( } } -void MongoInterfaceStandalone::closeBackupCursor(OperationContext* opCtx, const UUID& backupId) { +void CommonMongodProcessInterface::closeBackupCursor(OperationContext* opCtx, + const UUID& backupId) { auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext()); if (backupCursorHooks->enabled()) { backupCursorHooks->closeBackupCursor(opCtx, backupId); @@ -593,9 +409,8 @@ void MongoInterfaceStandalone::closeBackupCursor(OperationContext* opCtx, const } } -BackupCursorExtendState MongoInterfaceStandalone::extendBackupCursor(OperationContext* opCtx, - const UUID& backupId, - const Timestamp& extendTo) { +BackupCursorExtendState CommonMongodProcessInterface::extendBackupCursor( + OperationContext* opCtx, const UUID& backupId, const Timestamp& extendTo) { auto backupCursorHooks = BackupCursorHooks::get(opCtx->getServiceContext()); if (backupCursorHooks->enabled()) { return backupCursorHooks->extendBackupCursor(opCtx, backupId, extendTo); @@ -604,7 +419,7 @@ BackupCursorExtendState MongoInterfaceStandalone::extendBackupCursor(OperationCo } } -std::vector<BSONObj> MongoInterfaceStandalone::getMatchingPlanCacheEntryStats( +std::vector<BSONObj> CommonMongodProcessInterface::getMatchingPlanCacheEntryStats( OperationContext* opCtx, const NamespaceString& nss, const MatchExpression* matchExp) const { const auto serializer = [](const PlanCacheEntry& entry) { BSONObjBuilder out; @@ -627,7 +442,7 @@ std::vector<BSONObj> MongoInterfaceStandalone::getMatchingPlanCacheEntryStats( return planCache->getMatchingStats(serializer, predicate); } -bool MongoInterfaceStandalone::fieldsHaveSupportingUniqueIndex( +bool CommonMongodProcessInterface::fieldsHaveSupportingUniqueIndex( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, const std::set<FieldPath>& fieldPaths) const { @@ -655,7 +470,7 @@ bool MongoInterfaceStandalone::fieldsHaveSupportingUniqueIndex( return false; } -BSONObj MongoInterfaceStandalone::_reportCurrentOpForClient( +BSONObj CommonMongodProcessInterface::_reportCurrentOpForClient( OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps, @@ -692,14 +507,13 @@ BSONObj MongoInterfaceStandalone::_reportCurrentOpForClient( return builder.obj(); } -void MongoInterfaceStandalone::_reportCurrentOpsForTransactionCoordinators( +void CommonMongodProcessInterface::_reportCurrentOpsForTransactionCoordinators( OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops) const { reportCurrentOpsForTransactionCoordinators(opCtx, includeIdle, ops); } -void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, - CurrentOpUserMode userMode, - std::vector<BSONObj>* ops) const { +void CommonMongodProcessInterface::_reportCurrentOpsForIdleSessions( + OperationContext* opCtx, CurrentOpUserMode userMode, std::vector<BSONObj>* ops) const { auto sessionCatalog = SessionCatalog::get(opCtx); const bool authEnabled = @@ -721,7 +535,7 @@ void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext }); } -std::unique_ptr<CollatorInterface> MongoInterfaceStandalone::_getCollectionDefaultCollator( +std::unique_ptr<CollatorInterface> CommonMongodProcessInterface::_getCollectionDefaultCollator( OperationContext* opCtx, StringData dbName, UUID collectionUUID) { auto it = _collatorCache.find(collectionUUID); if (it == _collatorCache.end()) { @@ -745,13 +559,13 @@ std::unique_ptr<CollatorInterface> MongoInterfaceStandalone::_getCollectionDefau return collator ? collator->clone() : nullptr; } -std::unique_ptr<ResourceYielder> MongoInterfaceStandalone::getResourceYielder() const { +std::unique_ptr<ResourceYielder> CommonMongodProcessInterface::getResourceYielder() const { return std::make_unique<MongoDResourceYielder>(); } std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> -MongoInterfaceStandalone::ensureFieldsUniqueOrResolveDocumentKey( +CommonMongodProcessInterface::ensureFieldsUniqueOrResolveDocumentKey( const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<std::vector<std::string>> fields, boost::optional<ChunkVersion> targetCollectionVersion, @@ -779,4 +593,54 @@ MongoInterfaceStandalone::ensureFieldsUniqueOrResolveDocumentKey( return {fieldPaths, targetCollectionVersion}; } +write_ops::Insert CommonMongodProcessInterface::buildInsertOp(const NamespaceString& nss, + std::vector<BSONObj>&& objs, + bool bypassDocValidation) { + write_ops::Insert insertOp(nss); + insertOp.setDocuments(std::move(objs)); + insertOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(false); + wcb.setBypassDocumentValidation(bypassDocValidation); + return wcb; + }()); + return insertOp; +} + +Update CommonMongodProcessInterface::buildUpdateOp( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + BatchedObjects&& batch, + UpsertType upsert, + bool multi) { + Update updateOp(nss); + updateOp.setUpdates([&] { + std::vector<write_ops::UpdateOpEntry> updateEntries; + for (auto&& obj : batch) { + updateEntries.push_back([&] { + write_ops::UpdateOpEntry entry; + auto&& [q, u, c] = obj; + entry.setQ(std::move(q)); + entry.setU(std::move(u)); + entry.setC(std::move(c)); + entry.setUpsert(upsert != UpsertType::kNone); + // TODO SERVER-44884: after branching for 4.5, remove the 'useNewUpsert' flag. + entry.setUpsertSupplied({{entry.getUpsert() && expCtx->useNewUpsert, + upsert == UpsertType::kInsertSuppliedDoc}}); + entry.setMulti(multi); + return entry; + }()); + } + return updateEntries; + }()); + updateOp.setWriteCommandBase([&] { + write_ops::WriteCommandBase wcb; + wcb.setOrdered(false); + wcb.setBypassDocumentValidation(expCtx->bypassDocumentValidation); + return wcb; + }()); + updateOp.setRuntimeConstants(expCtx->getRuntimeConstants()); + return updateOp; +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h index a8f098ee556..f8058e18d89 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2020-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -34,8 +34,8 @@ #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/pipeline/javascript_execution.h" -#include "mongo/db/pipeline/mongo_process_common.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/process_interface/common_process_interface.h" namespace mongo { @@ -43,16 +43,14 @@ using write_ops::Insert; using write_ops::Update; /** - * Class to provide access to mongod-specific implementations of methods required by some - * document sources. + * Provides the implementations of interfaces that are shared across different types of mongod + * nodes. */ -class MongoInterfaceStandalone : public MongoProcessCommon { +class CommonMongodProcessInterface : public CommonProcessInterface { public: - // static std::shared_ptr<MongoProcessInterface> create(OperationContext* opCtx); + CommonMongodProcessInterface(OperationContext* opCtx); - MongoInterfaceStandalone(OperationContext* opCtx); - - virtual ~MongoInterfaceStandalone() = default; + virtual ~CommonMongodProcessInterface() = default; std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator( repl::OpTime time) const final; @@ -62,27 +60,12 @@ public: * sending request against nss based on this information. */ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; - Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) override; - StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - BatchedObjects&& batch, - const WriteConcernOptions& wc, - UpsertType upsert, - bool multi, - boost::optional<OID> targetEpoch) override; std::vector<Document> getIndexStats(OperationContext* opCtx, const NamespaceString& ns, StringData host, bool addShardName) final; - std::list<BSONObj> getIndexSpecs(OperationContext* opCtx, - const NamespaceString& ns, - bool includeBuildUUIDs); void appendLatencyStats(OperationContext* opCtx, const NamespaceString& nss, bool includeHistograms, @@ -98,39 +81,14 @@ public: const NamespaceString& nss, BSONObjBuilder* builder) const final override; BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final; - void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes); - void createCollection(OperationContext* opCtx, - const std::string& dbName, - const BSONObj& cmdObj); - void createIndexesOnEmptyCollection(OperationContext* opCtx, - const NamespaceString& ns, - const std::vector<BSONObj>& indexSpecs); - void dropCollection(OperationContext* opCtx, const NamespaceString& collection); std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final; - std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline, - bool allowTargetingShards = true) override; std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override; + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final; - std::unique_ptr<ShardFilterer> getShardFilterer( - const boost::intrusive_ptr<ExpressionContext>& expCtx) const override { - // We'll never do shard filtering on a standalone. - return nullptr; - } - std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( - OperationContext* opCtx, const NamespaceString&, UUID) const override; - std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( - OperationContext* opCtx, const NamespaceString&) const override; boost::optional<Document> lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, @@ -155,38 +113,15 @@ public: const NamespaceString& nss, const std::set<FieldPath>& fieldPaths) const; - boost::optional<ChunkVersion> refreshAndGetCollectionVersion( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss) const final { - return boost::none; // Nothing is sharded here. - } - virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - ChunkVersion targetCollectionVersion) const override { - uasserted(51020, "unexpected request to consult sharding catalog on non-shardsvr"); - } - - std::unique_ptr<ResourceYielder> getResourceYielder() const override; + std::unique_ptr<ResourceYielder> getResourceYielder() const final; std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> ensureFieldsUniqueOrResolveDocumentKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<std::vector<std::string>> fields, boost::optional<ChunkVersion> targetCollectionVersion, - const NamespaceString& outputNs) const override; + const NamespaceString& outputNs) const final; protected: - BSONObj _reportCurrentOpForClient(OperationContext* opCtx, - Client* client, - CurrentOpTruncateMode truncateOps, - CurrentOpBacktraceMode backtraceMode) const final; - - void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, - CurrentOpUserMode userMode, - std::vector<BSONObj>* ops) const final; - - void _reportCurrentOpsForTransactionCoordinators(OperationContext* opCtx, - bool includeIdle, - std::vector<BSONObj>* ops) const final; /** * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. */ @@ -203,6 +138,19 @@ protected: UpsertType upsert, bool multi); + BSONObj _reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps, + CurrentOpBacktraceMode backtraceMode) const final; + + void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, + CurrentOpUserMode userMode, + std::vector<BSONObj>* ops) const final; + + void _reportCurrentOpsForTransactionCoordinators(OperationContext* opCtx, + bool includeIdle, + std::vector<BSONObj>* ops) const final; + private: /** * Looks up the collection default collator for the collection given by 'collectionUUID'. A diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp index bb5466b9f8f..9d5acc0bb10 100644 --- a/src/mongo/db/pipeline/mongo_process_common.cpp +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.cpp @@ -31,7 +31,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/mongo_process_common.h" +#include "mongo/db/pipeline/process_interface/common_process_interface.h" #include "mongo/bson/mutable/document.h" #include "mongo/db/auth/authorization_manager.h" @@ -51,7 +51,7 @@ namespace mongo { -std::vector<BSONObj> MongoProcessCommon::getCurrentOps( +std::vector<BSONObj> CommonProcessInterface::getCurrentOps( const boost::intrusive_ptr<ExpressionContext>& expCtx, CurrentOpConnectionsMode connMode, CurrentOpSessionsMode sessionMode, @@ -129,7 +129,7 @@ std::vector<BSONObj> MongoProcessCommon::getCurrentOps( return ops; } -std::vector<FieldPath> MongoProcessCommon::collectDocumentKeyFieldsActingAsRouter( +std::vector<FieldPath> CommonProcessInterface::collectDocumentKeyFieldsActingAsRouter( OperationContext* opCtx, const NamespaceString& nss) const { if (auto chunkManager = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)) @@ -141,8 +141,8 @@ std::vector<FieldPath> MongoProcessCommon::collectDocumentKeyFieldsActingAsRoute return {"_id"}; } -bool MongoProcessCommon::keyPatternNamesExactPaths(const BSONObj& keyPattern, - const std::set<FieldPath>& uniqueKeyPaths) { +bool CommonProcessInterface::keyPatternNamesExactPaths( + const BSONObj& keyPattern, const std::set<FieldPath>& uniqueKeyPaths) { size_t nFieldsMatched = 0; for (auto&& elem : keyPattern) { if (!elem.isNumber()) { @@ -156,7 +156,7 @@ bool MongoProcessCommon::keyPatternNamesExactPaths(const BSONObj& keyPattern, return nFieldsMatched == uniqueKeyPaths.size(); } -boost::optional<ChunkVersion> MongoProcessCommon::refreshAndGetCollectionVersion( +boost::optional<ChunkVersion> CommonProcessInterface::refreshAndGetCollectionVersion( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss) const { const bool forceRefreshFromThisThread = false; auto routingInfo = uassertStatusOK( @@ -169,7 +169,7 @@ boost::optional<ChunkVersion> MongoProcessCommon::refreshAndGetCollectionVersion return boost::none; } -std::vector<FieldPath> MongoProcessCommon::_shardKeyToDocumentKeyFields( +std::vector<FieldPath> CommonProcessInterface::_shardKeyToDocumentKeyFields( const std::vector<std::unique_ptr<FieldRef>>& keyPatternFields) const { std::vector<FieldPath> result; bool gotId = false; @@ -183,7 +183,7 @@ std::vector<FieldPath> MongoProcessCommon::_shardKeyToDocumentKeyFields( return result; } -std::set<FieldPath> MongoProcessCommon::_convertToFieldPaths( +std::set<FieldPath> CommonProcessInterface::_convertToFieldPaths( const std::vector<std::string>& fields) const { std::set<FieldPath> fieldPaths; @@ -196,7 +196,7 @@ std::set<FieldPath> MongoProcessCommon::_convertToFieldPaths( return fieldPaths; } -std::string MongoProcessCommon::getHostAndPort(OperationContext* opCtx) const { +std::string CommonProcessInterface::getHostAndPort(OperationContext* opCtx) const { return getHostNameCachedAndPort(); } diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/process_interface/common_process_interface.h index 0b6cd75c764..7f68f8d201f 100644 --- a/src/mongo/db/pipeline/mongo_process_common.h +++ b/src/mongo/db/pipeline/process_interface/common_process_interface.h @@ -32,17 +32,17 @@ #include <vector> #include "mongo/bson/bsonobj.h" -#include "mongo/db/pipeline/mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/mongo_process_interface.h" namespace mongo { /** - * MongoProcessCommon provides base implementations of any MongoProcessInterface methods whose code - * is largely identical on mongoD and mongoS. + * CommonProcessInterface provides base implementations of any MongoProcessInterface methods + * whose code is largely identical on mongoD and mongoS. */ -class MongoProcessCommon : public MongoProcessInterface { +class CommonProcessInterface : public MongoProcessInterface { public: - virtual ~MongoProcessCommon() = default; + virtual ~CommonProcessInterface() = default; /** * Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and @@ -80,7 +80,7 @@ protected: /** * Returns a BSONObj representing a report of the operation which is currently being * executed by the supplied client. This method is called by the getCurrentOps method of - * MongoProcessCommon to delegate to the mongoS- or mongoD- specific implementation. + * CommonProcessInterface to delegate to the mongoS- or mongoD- specific implementation. */ virtual BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, diff --git a/src/mongo/db/pipeline/mongo_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongo_process_interface.cpp index 69cb07aeb87..46101c78629 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.cpp @@ -29,7 +29,7 @@ #include "mongo/platform/basic.h" -#include "mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/mongo_process_interface.h" #include "mongo/base/shim.h" diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index c4d5267f452..c4d5267f452 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h diff --git a/src/mongo/db/pipeline/process_interface_factory_mongod.cpp b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp index d8c3de62131..3845cfcd3f2 100644 --- a/src/mongo/db/pipeline/process_interface_factory_mongod.cpp +++ b/src/mongo/db/pipeline/process_interface/mongod_process_interface_factory.cpp @@ -30,15 +30,18 @@ #include "mongo/platform/basic.h" #include "mongo/base/shim.h" -#include "mongo/db/pipeline/process_interface_shardsvr.h" +#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" +#include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h" #include "mongo/db/s/sharding_state.h" namespace mongo { namespace { std::shared_ptr<MongoProcessInterface> MongoProcessInterfaceCreateImpl(OperationContext* opCtx) { - return ShardingState::get(opCtx)->enabled() ? std::make_shared<MongoInterfaceShardServer>(opCtx) - : std::make_shared<MongoInterfaceStandalone>(opCtx); + if (ShardingState::get(opCtx)->enabled()) { + return std::make_shared<ShardServerProcessInterface>(opCtx); + } + return std::make_shared<NonShardServerProcessInterface>(opCtx); } auto mongoProcessInterfaceCreateRegistration = MONGO_WEAK_FUNCTION_REGISTRATION( diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 29dc6f070db..938006f7f8a 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -29,7 +29,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/mongos_process_interface.h" +#include "mongo/db/pipeline/process_interface/mongos_process_interface.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection_catalog.h" @@ -93,14 +93,14 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, auto isIdIndex = index[IndexDescriptor::kIndexNameFieldName].String() == "_id_"; return (isIdIndex || index.getBoolField(IndexDescriptor::kUniqueFieldName)) && !index.hasField(IndexDescriptor::kPartialFilterExprFieldName) && - MongoProcessCommon::keyPatternNamesExactPaths( + CommonProcessInterface::keyPatternNamesExactPaths( index.getObjectField(IndexDescriptor::kKeyPatternFieldName), uniqueKeyPaths) && CollatorInterface::collatorsMatch(collation.get(), expCtx->getCollator()); } } // namespace -std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline( +std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions pipelineOptions) { @@ -119,7 +119,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::makePipeline( return pipeline; } -std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::attachCursorSourceToPipeline( +std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline, bool allowTargetingShards) { @@ -127,7 +127,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> MongoSInterface::attachCursorSourceTo return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, ownedPipeline); } -boost::optional<Document> MongoSInterface::lookupSingleDocument( +boost::optional<Document> MongosProcessInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, UUID collectionUUID, @@ -241,10 +241,11 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument( return (!finalBatch.empty() ? Document(finalBatch.front()) : boost::optional<Document>{}); } -BSONObj MongoSInterface::_reportCurrentOpForClient(OperationContext* opCtx, - Client* client, - CurrentOpTruncateMode truncateOps, - CurrentOpBacktraceMode backtraceMode) const { +BSONObj MongosProcessInterface::_reportCurrentOpForClient( + OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps, + CurrentOpBacktraceMode backtraceMode) const { BSONObjBuilder builder; CurOp::reportCurrentOpForClient(opCtx, @@ -264,9 +265,9 @@ BSONObj MongoSInterface::_reportCurrentOpForClient(OperationContext* opCtx, return builder.obj(); } -void MongoSInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, - CurrentOpUserMode userMode, - std::vector<BSONObj>* ops) const { +void MongosProcessInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, + CurrentOpUserMode userMode, + std::vector<BSONObj>* ops) const { auto sessionCatalog = SessionCatalog::get(opCtx); const bool authEnabled = @@ -291,10 +292,10 @@ void MongoSInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, }); } -void MongoSInterface::_reportCurrentOpsForTransactionCoordinators( +void MongosProcessInterface::_reportCurrentOpsForTransactionCoordinators( OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops) const {}; -std::vector<GenericCursor> MongoSInterface::getIdleCursors( +std::vector<GenericCursor> MongosProcessInterface::getIdleCursors( const intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { invariant(hasGlobalServiceContext()); auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager(); @@ -302,12 +303,12 @@ std::vector<GenericCursor> MongoSInterface::getIdleCursors( return cursorManager->getIdleCursors(expCtx->opCtx, userMode); } -bool MongoSInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { +bool MongosProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { auto routingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); return routingInfo.isOK() && routingInfo.getValue().cm(); } -bool MongoSInterface::fieldsHaveSupportingUniqueIndex( +bool MongosProcessInterface::fieldsHaveSupportingUniqueIndex( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, const std::set<FieldPath>& fieldPaths) const { @@ -336,7 +337,7 @@ bool MongoSInterface::fieldsHaveSupportingUniqueIndex( } std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> -MongoSInterface::ensureFieldsUniqueOrResolveDocumentKey( +MongosProcessInterface::ensureFieldsUniqueOrResolveDocumentKey( const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<std::vector<std::string>> fields, boost::optional<ChunkVersion> targetCollectionVersion, diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 8d93190cbcb..36abd00c6da 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -30,8 +30,8 @@ #pragma once #include "mongo/db/exec/shard_filterer.h" -#include "mongo/db/pipeline/mongo_process_common.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/process_interface/common_process_interface.h" namespace mongo { @@ -39,11 +39,11 @@ namespace mongo { * Class to provide access to mongos-specific implementations of methods required by some * document sources. */ -class MongoSInterface : public MongoProcessCommon { +class MongosProcessInterface : public CommonProcessInterface { public: - MongoSInterface() = default; + MongosProcessInterface() = default; - virtual ~MongoSInterface() = default; + virtual ~MongosProcessInterface() = default; boost::optional<Document> lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, diff --git a/src/mongo/db/pipeline/mongos_process_interface_test.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp index 91d79185481..63d69d93eab 100644 --- a/src/mongo/db/pipeline/mongos_process_interface_test.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface_test.cpp @@ -30,15 +30,15 @@ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" -#include "mongo/db/pipeline/mongos_process_interface.h" +#include "mongo/db/pipeline/process_interface/mongos_process_interface.h" #include "mongo/unittest/unittest.h" namespace mongo { namespace { -class MongoProcessInterfaceForTest : public MongoSInterface { +class MongosProcessInterfaceForTest : public MongosProcessInterface { public: - using MongoSInterface::MongoSInterface; + using MongosProcessInterface::MongosProcessInterface; bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, @@ -49,18 +49,18 @@ public: bool hasSupportingIndexForFields{true}; }; -class MongoSInterfaceTest : public AggregationContextFixture { +class MongosProcessInterfaceTest : public AggregationContextFixture { public: - MongoSInterfaceTest() { + MongosProcessInterfaceTest() { getExpCtx()->inMongos = true; } auto makeProcessInterface() { - return std::make_unique<MongoProcessInterfaceForTest>(); + return std::make_unique<MongosProcessInterfaceForTest>(); } }; -TEST_F(MongoSInterfaceTest, FailsToEnsureFieldsUniqueIfTargetCollectionVersionIsSpecified) { +TEST_F(MongosProcessInterfaceTest, FailsToEnsureFieldsUniqueIfTargetCollectionVersionIsSpecified) { auto expCtx = getExpCtx(); auto targetCollectionVersion = boost::make_optional(ChunkVersion(0, 0, OID::gen())); auto processInterface = makeProcessInterface(); @@ -71,7 +71,7 @@ TEST_F(MongoSInterfaceTest, FailsToEnsureFieldsUniqueIfTargetCollectionVersionIs 51179); } -TEST_F(MongoSInterfaceTest, FailsToEnsureFieldsUniqueIfNotSupportedByIndex) { +TEST_F(MongosProcessInterfaceTest, FailsToEnsureFieldsUniqueIfNotSupportedByIndex) { auto expCtx = getExpCtx(); auto targetCollectionVersion = boost::none; auto processInterface = makeProcessInterface(); diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp new file mode 100644 index 00000000000..5b2f64e4802 --- /dev/null +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" + +#include "mongo/db/catalog/create_collection.h" +#include "mongo/db/catalog/drop_collection.h" +#include "mongo/db/catalog/list_indexes.h" +#include "mongo/db/catalog/rename_collection.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/index_builds_coordinator.h" + +namespace mongo { + +Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { + auto writeResults = performInserts( + expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + + // Need to check each result in the batch since the writes are unordered. + for (const auto& result : writeResults.results) { + if (result.getStatus() != Status::OK()) { + return result.getStatus(); + } + } + return Status::OK(); +} + +StatusWith<MongoProcessInterface::UpdateResult> NonShardServerProcessInterface::update( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + BatchedObjects&& batch, + const WriteConcernOptions& wc, + UpsertType upsert, + bool multi, + boost::optional<OID> targetEpoch) { + auto writeResults = + performUpdates(expCtx->opCtx, buildUpdateOp(expCtx, ns, std::move(batch), upsert, multi)); + + // Need to check each result in the batch since the writes are unordered. + UpdateResult updateResult; + for (const auto& result : writeResults.results) { + if (result.getStatus() != Status::OK()) { + return result.getStatus(); + } + + updateResult.nMatched += result.getValue().getN(); + updateResult.nModified += result.getValue().getNModified(); + } + return updateResult; +} + +std::list<BSONObj> NonShardServerProcessInterface::getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs) { + return listIndexesEmptyListIfMissing(opCtx, ns, includeBuildUUIDs); +} + +void NonShardServerProcessInterface::createIndexesOnEmptyCollection( + OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) { + AutoGetCollection autoColl(opCtx, ns, MODE_X); + writeConflictRetry( + opCtx, "CommonMongodProcessInterface::createIndexesOnEmptyCollection", ns.ns(), [&] { + auto collection = autoColl.getCollection(); + invariant(collection, + str::stream() << "Failed to create indexes for aggregation because " + "collection does not exist: " + << ns << ": " << BSON("indexes" << indexSpecs)); + + invariant(0U == collection->numRecords(opCtx), + str::stream() << "Expected empty collection for index creation: " << ns + << ": numRecords: " << collection->numRecords(opCtx) << ": " + << BSON("indexes" << indexSpecs)); + + // Secondary index builds do not filter existing indexes so we have to do this on the + // primary. + auto removeIndexBuildsToo = false; + auto filteredIndexes = collection->getIndexCatalog()->removeExistingIndexes( + opCtx, indexSpecs, removeIndexBuildsToo); + if (filteredIndexes.empty()) { + return; + } + + WriteUnitOfWork wuow(opCtx); + IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection( + opCtx, collection->uuid(), filteredIndexes, false // fromMigrate + ); + wuow.commit(); + }); +} +void NonShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) { + NamespaceString sourceNs = NamespaceString(renameCommandObj["renameCollection"].String()); + doLocalRenameIfOptionsAndIndexesHaveNotChanged(opCtx, + sourceNs, + targetNs, + renameCommandObj["dropTarget"].trueValue(), + renameCommandObj["stayTemp"].trueValue(), + originalIndexes, + originalCollectionOptions); +} + +void NonShardServerProcessInterface::createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) { + uassertStatusOK(mongo::createCollection(opCtx, dbName, cmdObj)); +} + +void NonShardServerProcessInterface::dropCollection(OperationContext* opCtx, + const NamespaceString& ns) { + BSONObjBuilder result; + uassertStatusOK(mongo::dropCollection( + opCtx, ns, result, {}, DropCollectionSystemCollectionMode::kDisallowSystemCollectionDrops)); +} + +std::unique_ptr<Pipeline, PipelineDeleter> +NonShardServerProcessInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* ownedPipeline, + bool allowTargetingShards) { + return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline); +} + +std::pair<std::vector<FieldPath>, bool> +NonShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection( + OperationContext* opCtx, const NamespaceString& nss, UUID uuid) const { + return {{"_id"}, false}; // Nothing is sharded. +} + +std::vector<FieldPath> NonShardServerProcessInterface::collectDocumentKeyFieldsActingAsRouter( + OperationContext* opCtx, const NamespaceString& nss) const { + return {"_id"}; // Nothing is sharded. +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h new file mode 100644 index 00000000000..cd58220c9f0 --- /dev/null +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/client.h" +#include "mongo/db/exec/shard_filterer.h" +#include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/process_interface/common_mongod_process_interface.h" + +namespace mongo { + +/** + * Class to provide access to mongod-specific implementations of methods required by some + * document sources. + */ +class NonShardServerProcessInterface : public CommonMongodProcessInterface { +public: + using CommonMongodProcessInterface::CommonMongodProcessInterface; + + virtual ~NonShardServerProcessInterface() = default; + + Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) override; + StatusWith<UpdateResult> update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + BatchedObjects&& batch, + const WriteConcernOptions& wc, + UpsertType upsert, + bool multi, + boost::optional<OID> targetEpoch) override; + + std::list<BSONObj> getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs); + void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes); + void createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj); + void dropCollection(OperationContext* opCtx, const NamespaceString& collection); + void createIndexesOnEmptyCollection(OperationContext* opCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& indexSpecs); + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline, + bool allowTargetingShards) override; + std::unique_ptr<ShardFilterer> getShardFilterer( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const override { + // We'll never do shard filtering on a standalone. + return nullptr; + } + + boost::optional<ChunkVersion> refreshAndGetCollectionVersion( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss) const final { + return boost::none; // Nothing is sharded here. + } + + virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + ChunkVersion targetCollectionVersion) const override { + uasserted(51020, "unexpected request to consult sharding catalog on non-shardsvr"); + } + std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( + OperationContext* opCtx, const NamespaceString&, UUID) const final; + + std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( + OperationContext*, const NamespaceString&) const final; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 843d37019b5..dcd5179def4 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -31,16 +31,9 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/process_interface_shardsvr.h" - -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/catalog/collection.h" -#include "mongo/db/catalog/collection_catalog.h" -#include "mongo/db/catalog/database_holder.h" -#include "mongo/db/catalog/document_validation.h" -#include "mongo/db/commands.h" +#include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h" + #include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/shard_filterer_impl.h" #include "mongo/db/ops/write_ops_exec.h" @@ -80,7 +73,7 @@ void attachWriteConcern(BatchedCommandRequest* request, const WriteConcernOption } // namespace -void MongoInterfaceShardServer::checkRoutingInfoEpochOrThrow( +void ShardServerProcessInterface::checkRoutingInfoEpochOrThrow( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, ChunkVersion targetCollectionVersion) const { @@ -91,9 +84,9 @@ void MongoInterfaceShardServer::checkRoutingInfoEpochOrThrow( } std::pair<std::vector<FieldPath>, bool> -MongoInterfaceShardServer::collectDocumentKeyFieldsForHostedCollection(OperationContext* opCtx, - const NamespaceString& nss, - UUID uuid) const { +ShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection(OperationContext* opCtx, + const NamespaceString& nss, + UUID uuid) const { invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); const auto metadata = [opCtx, &nss]() -> ScopedCollectionMetadata { @@ -115,11 +108,11 @@ MongoInterfaceShardServer::collectDocumentKeyFieldsForHostedCollection(Operation return {_shardKeyToDocumentKeyFields(metadata->getKeyPatternFields()), true}; } -Status MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc, - boost::optional<OID> targetEpoch) { +Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { BatchedCommandResponse response; BatchWriteExecStats stats; @@ -134,7 +127,7 @@ Status MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionCo return response.toStatus(); } -StatusWith<MongoProcessInterface::UpdateResult> MongoInterfaceShardServer::update( +StatusWith<MongoProcessInterface::UpdateResult> ShardServerProcessInterface::update( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, BatchedObjects&& batch, @@ -158,7 +151,7 @@ StatusWith<MongoProcessInterface::UpdateResult> MongoInterfaceShardServer::updat return {{response.getN(), response.getNModified()}}; } -unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceShardServer::attachCursorSourceToPipeline( +unique_ptr<Pipeline, PipelineDeleter> ShardServerProcessInterface::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline, bool allowTargetingShards) { @@ -175,7 +168,7 @@ unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceShardServer::attachCursorSou return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, pipeline.release()); } -std::unique_ptr<ShardFilterer> MongoInterfaceShardServer::getShardFilterer( +std::unique_ptr<ShardFilterer> ShardServerProcessInterface::getShardFilterer( const boost::intrusive_ptr<ExpressionContext>& expCtx) const { const bool aggNsIsCollection = expCtx->uuid != boost::none; auto collectionFilter = CollectionShardingState::get(expCtx->opCtx, expCtx->ns) @@ -183,7 +176,7 @@ std::unique_ptr<ShardFilterer> MongoInterfaceShardServer::getShardFilterer( return std::make_unique<ShardFiltererImpl>(std::move(collectionFilter)); } -void MongoInterfaceShardServer::renameIfOptionsAndIndexesHaveNotChanged( +void ShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( OperationContext* opCtx, const BSONObj& renameCommandObj, const NamespaceString& destinationNs, @@ -222,9 +215,9 @@ void MongoInterfaceShardServer::renameIfOptionsAndIndexesHaveNotChanged( str::stream() << "failed while running command " << newCmdObj); } -std::list<BSONObj> MongoInterfaceShardServer::getIndexSpecs(OperationContext* opCtx, - const NamespaceString& ns, - bool includeBuildUUIDs) { +std::list<BSONObj> ShardServerProcessInterface::getIndexSpecs(OperationContext* opCtx, + const NamespaceString& ns, + bool includeBuildUUIDs) { auto cachedDbInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db())); auto shard = uassertStatusOK( @@ -243,9 +236,9 @@ std::list<BSONObj> MongoInterfaceShardServer::getIndexSpecs(OperationContext* op } return std::list<BSONObj>(indexes.docs.begin(), indexes.docs.end()); } -void MongoInterfaceShardServer::createCollection(OperationContext* opCtx, - const std::string& dbName, - const BSONObj& cmdObj) { +void ShardServerProcessInterface::createCollection(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj) { auto cachedDbInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, dbName)); BSONObj finalCmdObj = cmdObj; @@ -271,7 +264,7 @@ void MongoInterfaceShardServer::createCollection(OperationContext* opCtx, << "write concern failed while running command " << finalCmdObj); } -void MongoInterfaceShardServer::createIndexesOnEmptyCollection( +void ShardServerProcessInterface::createIndexesOnEmptyCollection( OperationContext* opCtx, const NamespaceString& ns, const std::vector<BSONObj>& indexSpecs) { auto cachedDbInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, ns.db())); @@ -299,7 +292,8 @@ void MongoInterfaceShardServer::createIndexesOnEmptyCollection( str::stream() << "write concern failed while running command " << cmdObj); } -void MongoInterfaceShardServer::dropCollection(OperationContext* opCtx, const NamespaceString& ns) { +void ShardServerProcessInterface::dropCollection(OperationContext* opCtx, + const NamespaceString& ns) { // Build and execute the dropCollection command against the primary shard of the given // database. auto cachedDbInfo = diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index 66098e7377e..a7fa2b9198d 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -31,16 +31,16 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/pipeline/pipeline.h" -#include "mongo/db/pipeline/process_interface_standalone.h" +#include "mongo/db/pipeline/process_interface/common_mongod_process_interface.h" namespace mongo { /** * Specialized version of the MongoDInterface when this node is a shard server. */ -class MongoInterfaceShardServer final : public MongoInterfaceStandalone { +class ShardServerProcessInterface final : public CommonMongodProcessInterface { public: - using MongoInterfaceStandalone::MongoInterfaceStandalone; + using CommonMongodProcessInterface::CommonMongodProcessInterface; void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/process_interface_standalone_test.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp index e522111e395..14df45018df 100644 --- a/src/mongo/db/pipeline/process_interface_standalone_test.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp @@ -30,15 +30,15 @@ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" -#include "mongo/db/pipeline/process_interface_standalone.h" +#include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" #include "mongo/unittest/unittest.h" namespace mongo { namespace { -class MongoProcessInterfaceForTest : public MongoInterfaceStandalone { +class MongoProcessInterfaceForTest : public NonShardServerProcessInterface { public: - using MongoInterfaceStandalone::MongoInterfaceStandalone; + using NonShardServerProcessInterface::NonShardServerProcessInterface; bool fieldsHaveSupportingUniqueIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.cpp b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp index 9d979f1c308..cba2a97ffcc 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.cpp +++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp @@ -29,7 +29,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h" +#include "mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_mock.h" @@ -37,8 +37,7 @@ namespace mongo { -std::unique_ptr<Pipeline, PipelineDeleter> -StubMongoProcessInterfaceLookupSingleDocument::makePipeline( +std::unique_ptr<Pipeline, PipelineDeleter> StubLookupSingleDocumentProcessInterface::makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts) { @@ -57,7 +56,7 @@ StubMongoProcessInterfaceLookupSingleDocument::makePipeline( } std::unique_ptr<Pipeline, PipelineDeleter> -StubMongoProcessInterfaceLookupSingleDocument::attachCursorSourceToPipelineForLocalRead( +StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipelineForLocalRead( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline) { std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, PipelineDeleter(expCtx->opCtx)); @@ -66,15 +65,14 @@ StubMongoProcessInterfaceLookupSingleDocument::attachCursorSourceToPipelineForLo } std::unique_ptr<Pipeline, PipelineDeleter> -StubMongoProcessInterfaceLookupSingleDocument::attachCursorSourceToPipeline( +StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* ownedPipeline, bool allowTargetingShards) { return attachCursorSourceToPipelineForLocalRead(expCtx, ownedPipeline); } - -boost::optional<Document> StubMongoProcessInterfaceLookupSingleDocument::lookupSingleDocument( +boost::optional<Document> StubLookupSingleDocumentProcessInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, UUID collectionUUID, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h index 1a4956ce97d..421fb759ae2 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface_lookup_single_document.h +++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h @@ -37,7 +37,7 @@ #include "mongo/db/exec/shard_filterer.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/pipeline.h" -#include "mongo/db/pipeline/stub_mongo_process_interface.h" +#include "mongo/db/pipeline/process_interface/stub_mongo_process_interface.h" namespace mongo { @@ -62,10 +62,9 @@ public: /** * A mock MongoProcessInterface which allows mocking a foreign pipeline. */ -class StubMongoProcessInterfaceLookupSingleDocument final : public StubMongoProcessInterface { +class StubLookupSingleDocumentProcessInterface final : public StubMongoProcessInterface { public: - StubMongoProcessInterfaceLookupSingleDocument( - std::deque<DocumentSource::GetNextResult> mockResults) + StubLookupSingleDocumentProcessInterface(std::deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 5e7d0754888..69d7b5eec49 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -29,8 +29,8 @@ #pragma once -#include "mongo/db/pipeline/mongo_process_interface.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/process_interface/mongo_process_interface.h" #include "mongo/util/assert_util.h" |