diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-01-17 17:40:35 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-03-07 14:01:39 -0500 |
commit | 083647f38662195653b87b6a79ae1183d269f910 (patch) | |
tree | 1cc2a3b7a036232da1a3c1bc96bc5c39d9d90551 /src/mongo/db | |
parent | 4d2ca242fb7b9a28d1123831db99664eb0db3e23 (diff) | |
download | mongo-083647f38662195653b87b6a79ae1183d269f910.tar.gz |
SERVER-29908 Move OpObserver callbacks out of CollectionShardingState
Diffstat (limited to 'src/mongo/db')
25 files changed, 938 insertions, 774 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 540b4ac0a0d..e3bbf70a02f 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1519,8 +1519,6 @@ env.CppUnitTest( ], LIBDEPS=[ 'keys_collection_document', - '$BUILD_DIR/mongo/s/coreshard', - '$BUILD_DIR/mongo/s/client/sharding_client', ], ) @@ -1562,10 +1560,9 @@ env.CppUnitTest( 'logical_time_validator_test.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/s/config_server_test_fixture', 'keys_collection_manager', 'logical_time_validator', - '$BUILD_DIR/mongo/s/config_server_test_fixture', - '$BUILD_DIR/mongo/s/coreshard', ], ) @@ -1605,32 +1602,10 @@ env.CppUnitTest( ) env.CppUnitTest( - target='keys_collection_cache_test', - source=[ - 'keys_collection_cache_test.cpp', - ], - LIBDEPS=[ - 'keys_collection_manager', - '$BUILD_DIR/mongo/s/config_server_test_fixture', - '$BUILD_DIR/mongo/s/coreshard', - ], -) - -env.CppUnitTest( - target='key_generator_update_test', - source=[ - 'key_generator_update_test.cpp', - ], - LIBDEPS=[ - 'keys_collection_manager', - '$BUILD_DIR/mongo/s/config_server_test_fixture', - '$BUILD_DIR/mongo/s/coreshard', - ], -) - -env.CppUnitTest( target='keys_collection_manager_sharding_test', source=[ + 'key_generator_update_test.cpp', + 'keys_collection_cache_test.cpp', 'keys_collection_manager_sharding_test.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 8e27f6a2c26..683d2f3e347 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -108,6 +108,8 @@ #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/s/balancer/balancer.h" #include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/s/config_server_op_observer.h" +#include "mongo/db/s/shard_server_op_observer.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state.h" @@ -271,6 +273,13 @@ ExitCode _initAndListen(int listenPort) { auto opObserverRegistry = stdx::make_unique<OpObserverRegistry>(); opObserverRegistry->addObserver(stdx::make_unique<OpObserverImpl>()); opObserverRegistry->addObserver(stdx::make_unique<UUIDCatalogObserver>()); + + if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + opObserverRegistry->addObserver(stdx::make_unique<ShardServerOpObserver>()); + } else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + opObserverRegistry->addObserver(stdx::make_unique<ConfigServerOpObserver>()); + } + serviceContext->setOpObserver(std::move(opObserverRegistry)); DBDirectClientFactory::get(serviceContext).registerImplementation([](OperationContext* opCtx) { diff --git a/src/mongo/db/key_generator_update_test.cpp b/src/mongo/db/key_generator_update_test.cpp index ee33e8204bf..5c30b3214eb 100644 --- a/src/mongo/db/key_generator_update_test.cpp +++ b/src/mongo/db/key_generator_update_test.cpp @@ -37,10 +37,7 @@ #include "mongo/db/keys_collection_document.h" #include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/operation_context.h" -#include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/config_server_test_fixture.h" -#include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" @@ -59,12 +56,6 @@ protected: Grid::get(operationContext())->catalogClient()); } - std::unique_ptr<DistLockManager> makeDistLockManager( - std::unique_ptr<DistLockCatalog> distLockCatalog) override { - invariant(distLockCatalog); - return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog)); - } - KeysCollectionClient* catalogClient() const { return _catalogClient.get(); } diff --git a/src/mongo/db/keys_collection_manager_sharding_test.cpp b/src/mongo/db/keys_collection_manager_sharding_test.cpp index 4653653c836..c97ca659dd6 100644 --- a/src/mongo/db/keys_collection_manager_sharding_test.cpp +++ b/src/mongo/db/keys_collection_manager_sharding_test.cpp @@ -37,10 +37,7 @@ #include "mongo/db/keys_collection_manager.h" #include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/operation_context.h" -#include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/config_server_test_fixture.h" -#include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" @@ -76,12 +73,6 @@ protected: ConfigServerTestFixture::tearDown(); } - std::unique_ptr<DistLockManager> makeDistLockManager( - std::unique_ptr<DistLockCatalog> distLockCatalog) override { - invariant(distLockCatalog); - return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog)); - } - private: std::unique_ptr<KeysCollectionManager> _keyManager; }; diff --git a/src/mongo/db/logical_time_validator_test.cpp b/src/mongo/db/logical_time_validator_test.cpp index 90eddb5ec98..5db41df3d72 100644 --- a/src/mongo/db/logical_time_validator_test.cpp +++ b/src/mongo/db/logical_time_validator_test.cpp @@ -34,15 +34,11 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/logical_time.h" #include "mongo/db/logical_time_validator.h" -#include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" -#include "mongo/db/service_context.h" #include "mongo/db/signed_logical_time.h" #include "mongo/db/time_proof_service.h" #include "mongo/platform/basic.h" -#include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/config_server_test_fixture.h" -#include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" @@ -79,12 +75,6 @@ protected: ConfigServerTestFixture::tearDown(); } - std::unique_ptr<DistLockManager> makeDistLockManager( - std::unique_ptr<DistLockCatalog> distLockCatalog) override { - invariant(distLockCatalog); - return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog)); - } - /** * Forces KeyManager to refresh cache and generate new keys. */ diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index f5675135aad..c343ff63050 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -47,7 +47,6 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/session_catalog.h" #include "mongo/db/views/durable_view_catalog.h" @@ -458,12 +457,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) { if (!args.fromMigrate) { auto css = CollectionShardingState::get(opCtx, args.nss); - css->onUpdateOp(opCtx, - args.criteria, - args.update, - args.updatedDoc, - opTime.writeOpTime, - opTime.prePostImageOpTime); + css->onUpdateOp(opCtx, args.updatedDoc, opTime.writeOpTime, opTime.prePostImageOpTime); } } @@ -728,9 +722,6 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, cmdObj, nullptr); - auto css = CollectionShardingState::get(opCtx, collectionName); - css->onDropCollection(opCtx, collectionName); - // Evict namespace entry from the namespace/uuid cache if it exists. NamespaceUUIDCache::get(opCtx).evictNamespace(collectionName); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index fcbb40e3199..1019ee08d1c 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -210,6 +210,7 @@ env.CppUnitTest( 'document_source_lookup_change_post_image_test.cpp', 'document_source_lookup_test.cpp', 'document_source_match_test.cpp', + 'document_source_merge_cursors_test.cpp', 'document_source_mock_test.cpp', 'document_source_project_test.cpp', 'document_source_redact_test.cpp', @@ -228,6 +229,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + '$BUILD_DIR/mongo/s/sharding_router_test_fixture', '$BUILD_DIR/mongo/util/clock_source_mock', 'document_source_mock', 'document_value_test_util', @@ -235,24 +237,6 @@ env.CppUnitTest( ], ) -# This test depends on the sharding test fixture, which has global initializers that conflict with -# the ones set in 'document_source_test', so is split into its own test. -env.CppUnitTest( - target='document_source_merge_cursors_test', - source=[ - 'document_source_merge_cursors_test.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/query/query_request', - '$BUILD_DIR/mongo/db/query/query_test_service_context', - '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', - '$BUILD_DIR/mongo/s/sharding_test_fixture', - 'pipeline', - 'document_value_test_util', - ], -) - env.CppUnitTest( target='document_source_facet_test', source='document_source_facet_test.cpp', diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp index cdaf6b40bc4..4809ac54b50 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp @@ -46,7 +46,7 @@ #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/sharding_test_fixture.h" +#include "mongo/s/sharding_router_test_fixture.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/pipeline/document_source_sample_test.cpp b/src/mongo/db/pipeline/document_source_sample_test.cpp index 417a2ff5a34..c1c85924c50 100644 --- a/src/mongo/db/pipeline/document_source_sample_test.cpp +++ b/src/mongo/db/pipeline/document_source_sample_test.cpp @@ -48,24 +48,9 @@ #include "mongo/util/tick_source_mock.h" namespace mongo { - -std::unique_ptr<ServiceContextNoop> makeTestServiceContext() { - auto service = stdx::make_unique<ServiceContextNoop>(); - service->setFastClockSource(stdx::make_unique<ClockSourceMock>()); - service->setTickSource(stdx::make_unique<TickSourceMock>()); - return service; -} - namespace { -using boost::intrusive_ptr; -static const char* const ns = "unittests.document_source_sample_tests"; - -// Stub to avoid including the server environment library. -MONGO_INITIALIZER(SetGlobalEnvironment)(InitializerContext* context) { - setGlobalServiceContext(makeTestServiceContext()); - return Status::OK(); -} +using boost::intrusive_ptr; class SampleBasics : public AggregationContextFixture { public: diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 5206a25bfca..0c03223c985 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -25,10 +25,12 @@ env.Library( target='sharding_runtime_d', source=[ 'chunk_splitter.cpp', + 'config_server_op_observer.cpp', 'implicit_create_collection.cpp', 'migration_destination_manager.cpp', 'session_catalog_migration_destination.cpp', 'shard_filtering_metadata_refresh.cpp', + 'shard_server_op_observer.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/db_raii', @@ -158,7 +160,6 @@ env.CppUnitTest( ], LIBDEPS=[ '$BUILD_DIR/mongo/s/config_server_test_fixture', - '$BUILD_DIR/mongo/s/coreshard', '$BUILD_DIR/mongo/util/version_impl', 'balancer', ] diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 4b6c79c343c..fcabe61f658 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -47,7 +47,6 @@ #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" #include "mongo/s/shard_util.h" #include "mongo/stdx/memory.h" diff --git a/src/mongo/db/s/collection_range_deleter_test.cpp b/src/mongo/db/s/collection_range_deleter_test.cpp index df1f28be264..beddabc66de 100644 --- a/src/mongo/db/s/collection_range_deleter_test.cpp +++ b/src/mongo/db/s/collection_range_deleter_test.cpp @@ -43,7 +43,7 @@ #include "mongo/s/balancer_configuration.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/sharding_mongod_test_fixture.h" +#include "mongo/s/shard_server_test_fixture.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -54,46 +54,38 @@ using unittest::assertGet; using Deletion = CollectionRangeDeleter::Deletion; const NamespaceString kNss = NamespaceString("foo", "bar"); -const std::string kPattern = "_id"; -const BSONObj kKeyPattern = BSON(kPattern << 1); -const std::string kShardName{"a"}; -const HostAndPort dummyHost("dummy", 123); +const std::string kShardKey = "_id"; +const BSONObj kShardKeyPattern = BSON(kShardKey << 1); const NamespaceString kAdminSysVer = NamespaceString("admin", "system.version"); -class CollectionRangeDeleterTest : public ShardingMongodTestFixture { +class CollectionRangeDeleterTest : public ShardServerTestFixture { protected: void setUp() override { - _epoch = OID::gen(); - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - ShardingMongodTestFixture::setUp(); - replicationCoordinator()->alwaysAllowWrites(true); - ASSERT_OK(initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost))); - - // RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()) - // ->setConnectionStringReturnValue(kConfigConnStr); + ShardServerTestFixture::setUp(); - configTargeter()->setFindHostReturnValue(dummyHost); + // Make every test run with a separate epoch + _epoch = OID::gen(); - DBDirectClient(operationContext()).createCollection(kNss.ns()); - { - AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); - auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss); - const KeyPattern skPattern(kKeyPattern); - auto cm = ChunkManager::makeNew( - kNss, - UUID::gen(), - kKeyPattern, - nullptr, - false, - epoch(), - {ChunkType(kNss, - ChunkRange{skPattern.globalMin(), skPattern.globalMax()}, - ChunkVersion(1, 0, epoch()), - ShardId("otherShard"))}); - collectionShardingState->refreshMetadata( - operationContext(), - stdx::make_unique<CollectionMetadata>(cm, ShardId("thisShard"))); - } + DBDirectClient client(operationContext()); + client.createCollection(kNss.ns()); + + const KeyPattern keyPattern(kShardKeyPattern); + auto cm = ChunkManager::makeNew( + kNss, + UUID::gen(), + keyPattern, + nullptr, + false, + epoch(), + {ChunkType(kNss, + ChunkRange{keyPattern.globalMin(), keyPattern.globalMax()}, + ChunkVersion(1, 0, epoch()), + ShardId("otherShard"))}); + + AutoGetCollection autoColl(operationContext(), kNss, MODE_IX); + auto const css = CollectionShardingState::get(operationContext(), kNss); + css->refreshMetadata(operationContext(), + stdx::make_unique<CollectionMetadata>(cm, ShardId("thisShard"))); } void tearDown() override { @@ -103,7 +95,7 @@ protected: collectionShardingState->refreshMetadata(operationContext(), nullptr); } - ShardingMongodTestFixture::tearDown(); + ShardServerTestFixture::tearDown(); } boost::optional<Date_t> next(CollectionRangeDeleter& rangeDeleter, int maxToDelete) { @@ -136,19 +128,20 @@ TEST_F(CollectionRangeDeleterTest, EmptyDatabase) { // Tests the case that there is data, but it is not in a range to clean. TEST_F(CollectionRangeDeleterTest, NoDataInGivenRangeToClean) { CollectionRangeDeleter rangeDeleter; - const BSONObj insertedDoc = BSON(kPattern << 25); + const BSONObj insertedDoc = BSON(kShardKey << 25); DBDirectClient dbclient(operationContext()); dbclient.insert(kNss.toString(), insertedDoc); - ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kPattern << 25))); + ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kShardKey << 25))); std::list<Deletion> ranges; - ranges.emplace_back(Deletion{ChunkRange{BSON(kPattern << 0), BSON(kPattern << 10)}, Date_t{}}); + ranges.emplace_back( + Deletion{ChunkRange{BSON(kShardKey << 0), BSON(kShardKey << 10)}, Date_t{}}); auto when = rangeDeleter.add(std::move(ranges)); ASSERT(when && *when == Date_t{}); ASSERT_EQ(1u, rangeDeleter.size()); ASSERT_TRUE(next(rangeDeleter, 1)); ASSERT_EQ(0u, rangeDeleter.size()); - ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kPattern << 25))); + ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kShardKey << 25))); ASSERT_FALSE(next(rangeDeleter, 1)); } @@ -156,19 +149,19 @@ TEST_F(CollectionRangeDeleterTest, NoDataInGivenRangeToClean) { // Tests the case that there is a single document within a range to clean. TEST_F(CollectionRangeDeleterTest, OneDocumentInOneRangeToClean) { CollectionRangeDeleter rangeDeleter; - const BSONObj insertedDoc = BSON(kPattern << 5); + const BSONObj insertedDoc = BSON(kShardKey << 5); DBDirectClient dbclient(operationContext()); - dbclient.insert(kNss.toString(), BSON(kPattern << 5)); - ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kPattern << 5))); + dbclient.insert(kNss.toString(), BSON(kShardKey << 5)); + ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kShardKey << 5))); std::list<Deletion> ranges; - auto deletion = Deletion{ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)), Date_t{}}; + auto deletion = Deletion{ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 10)), Date_t{}}; ranges.emplace_back(std::move(deletion)); auto when = rangeDeleter.add(std::move(ranges)); ASSERT(when && *when == Date_t{}); ASSERT_TRUE(ranges.empty()); // spliced elements out of it - auto optNotifn = rangeDeleter.overlaps(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10))); + auto optNotifn = rangeDeleter.overlaps(ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 10))); ASSERT(optNotifn); auto notifn = *optNotifn; ASSERT(!notifn.ready()); @@ -182,31 +175,31 @@ TEST_F(CollectionRangeDeleterTest, OneDocumentInOneRangeToClean) { ASSERT_TRUE(rangeDeleter.isEmpty()); ASSERT(notifn.ready() && notifn.waitStatus(operationContext()).isOK()); - ASSERT_TRUE(dbclient.findOne(kNss.toString(), QUERY(kPattern << 5)).isEmpty()); + ASSERT_TRUE(dbclient.findOne(kNss.toString(), QUERY(kShardKey << 5)).isEmpty()); ASSERT_FALSE(next(rangeDeleter, 1)); - ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kPattern << "startRangeDeletion"))); + ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kShardKey << "startRangeDeletion"))); } // Tests the case that there are multiple documents within a range to clean. TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInOneRangeToClean) { CollectionRangeDeleter rangeDeleter; DBDirectClient dbclient(operationContext()); - dbclient.insert(kNss.toString(), BSON(kPattern << 1)); - dbclient.insert(kNss.toString(), BSON(kPattern << 2)); - dbclient.insert(kNss.toString(), BSON(kPattern << 3)); - ASSERT_EQUALS(3ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + dbclient.insert(kNss.toString(), BSON(kShardKey << 1)); + dbclient.insert(kNss.toString(), BSON(kShardKey << 2)); + dbclient.insert(kNss.toString(), BSON(kShardKey << 3)); + ASSERT_EQUALS(3ULL, dbclient.count(kNss.toString(), BSON(kShardKey << LT << 5))); std::list<Deletion> ranges; - auto deletion = Deletion{ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)), Date_t{}}; + auto deletion = Deletion{ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 10)), Date_t{}}; ranges.emplace_back(std::move(deletion)); auto when = rangeDeleter.add(std::move(ranges)); ASSERT(when && *when == Date_t{}); ASSERT_TRUE(next(rangeDeleter, 100)); ASSERT_TRUE(next(rangeDeleter, 100)); - ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kShardKey << LT << 5))); ASSERT_FALSE(next(rangeDeleter, 100)); - ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kPattern << "startRangeDeletion"))); + ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kShardKey << "startRangeDeletion"))); } // Tests the case that there are multiple documents within a range to clean, and the range deleter @@ -214,70 +207,71 @@ TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInOneRangeToClean) { TEST_F(CollectionRangeDeleterTest, MultipleCleanupNextRangeCalls) { CollectionRangeDeleter rangeDeleter; DBDirectClient dbclient(operationContext()); - dbclient.insert(kNss.toString(), BSON(kPattern << 1)); - dbclient.insert(kNss.toString(), BSON(kPattern << 2)); - dbclient.insert(kNss.toString(), BSON(kPattern << 3)); - ASSERT_EQUALS(3ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + dbclient.insert(kNss.toString(), BSON(kShardKey << 1)); + dbclient.insert(kNss.toString(), BSON(kShardKey << 2)); + dbclient.insert(kNss.toString(), BSON(kShardKey << 3)); + ASSERT_EQUALS(3ULL, dbclient.count(kNss.toString(), BSON(kShardKey << LT << 5))); std::list<Deletion> ranges; - auto deletion = Deletion{ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)), Date_t{}}; + auto deletion = Deletion{ChunkRange(BSON(kShardKey << 0), BSON(kShardKey << 10)), Date_t{}}; ranges.emplace_back(std::move(deletion)); auto when = rangeDeleter.add(std::move(ranges)); ASSERT(when && *when == Date_t{}); ASSERT_TRUE(next(rangeDeleter, 1)); - ASSERT_EQUALS(2ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + ASSERT_EQUALS(2ULL, dbclient.count(kNss.toString(), BSON(kShardKey << LT << 5))); ASSERT_TRUE(next(rangeDeleter, 1)); - ASSERT_EQUALS(1ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + ASSERT_EQUALS(1ULL, dbclient.count(kNss.toString(), BSON(kShardKey << LT << 5))); ASSERT_TRUE(next(rangeDeleter, 1)); ASSERT_TRUE(next(rangeDeleter, 1)); - ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5))); + ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kShardKey << LT << 5))); ASSERT_FALSE(next(rangeDeleter, 1)); - ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kPattern << "startRangeDeletion"))); + ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kShardKey << "startRangeDeletion"))); } // Tests the case that there are two ranges to clean, each containing multiple documents. TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) { CollectionRangeDeleter rangeDeleter; DBDirectClient dbclient(operationContext()); - dbclient.insert(kNss.toString(), BSON(kPattern << 1)); - dbclient.insert(kNss.toString(), BSON(kPattern << 2)); - dbclient.insert(kNss.toString(), BSON(kPattern << 3)); - dbclient.insert(kNss.toString(), BSON(kPattern << 4)); - dbclient.insert(kNss.toString(), BSON(kPattern << 5)); - dbclient.insert(kNss.toString(), BSON(kPattern << 6)); - ASSERT_EQUALS(6ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 10))); + dbclient.insert(kNss.toString(), BSON(kShardKey << 1)); + dbclient.insert(kNss.toString(), BSON(kShardKey << 2)); + dbclient.insert(kNss.toString(), BSON(kShardKey << 3)); + dbclient.insert(kNss.toString(), BSON(kShardKey << 4)); + dbclient.insert(kNss.toString(), BSON(kShardKey << 5)); + dbclient.insert(kNss.toString(), BSON(kShardKey << 6)); + ASSERT_EQUALS(6ULL, dbclient.count(kNss.toString(), BSON(kShardKey << LT << 10))); std::list<Deletion> ranges; auto later = Date_t::now(); - ranges.emplace_back(Deletion{ChunkRange{BSON(kPattern << 0), BSON(kPattern << 3)}, later}); + ranges.emplace_back(Deletion{ChunkRange{BSON(kShardKey << 0), BSON(kShardKey << 3)}, later}); auto when = rangeDeleter.add(std::move(ranges)); ASSERT(when && *when == later); ASSERT_TRUE(ranges.empty()); // not guaranteed by std, but failure would indicate a problem. std::list<Deletion> ranges2; - ranges2.emplace_back(Deletion{ChunkRange{BSON(kPattern << 4), BSON(kPattern << 7)}, later}); + ranges2.emplace_back(Deletion{ChunkRange{BSON(kShardKey << 4), BSON(kShardKey << 7)}, later}); when = rangeDeleter.add(std::move(ranges2)); ASSERT(!when); std::list<Deletion> ranges3; - ranges3.emplace_back(Deletion{ChunkRange{BSON(kPattern << 3), BSON(kPattern << 4)}, Date_t{}}); + ranges3.emplace_back( + Deletion{ChunkRange{BSON(kShardKey << 3), BSON(kShardKey << 4)}, Date_t{}}); when = rangeDeleter.add(std::move(ranges3)); ASSERT(when); - auto optNotifn1 = rangeDeleter.overlaps(ChunkRange{BSON(kPattern << 0), BSON(kPattern << 3)}); + auto optNotifn1 = rangeDeleter.overlaps(ChunkRange{BSON(kShardKey << 0), BSON(kShardKey << 3)}); ASSERT_TRUE(optNotifn1); auto& notifn1 = *optNotifn1; ASSERT_FALSE(notifn1.ready()); - auto optNotifn2 = rangeDeleter.overlaps(ChunkRange{BSON(kPattern << 4), BSON(kPattern << 7)}); + auto optNotifn2 = rangeDeleter.overlaps(ChunkRange{BSON(kShardKey << 4), BSON(kShardKey << 7)}); ASSERT_TRUE(optNotifn2); auto& notifn2 = *optNotifn2; ASSERT_FALSE(notifn2.ready()); - auto optNotifn3 = rangeDeleter.overlaps(ChunkRange{BSON(kPattern << 3), BSON(kPattern << 4)}); + auto optNotifn3 = rangeDeleter.overlaps(ChunkRange{BSON(kShardKey << 3), BSON(kShardKey << 4)}); ASSERT_TRUE(optNotifn3); auto& notifn3 = *optNotifn3; ASSERT_FALSE(notifn3.ready()); @@ -289,9 +283,9 @@ TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) { ASSERT_FALSE(notifn1 != *optNotifn1); // no op log entry yet - ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kPattern << "startRangeDeletion"))); + ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kShardKey << "startRangeDeletion"))); - ASSERT_EQUALS(6ULL, dbclient.count(kNss.ns(), BSON(kPattern << LT << 7))); + ASSERT_EQUALS(6ULL, dbclient.count(kNss.ns(), BSON(kShardKey << LT << 7))); // catch range3, [3..4) only auto next1 = next(rangeDeleter, 100); @@ -299,11 +293,11 @@ TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) { ASSERT_EQUALS(*next1, Date_t{}); // no op log entry for immediate deletions - ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kPattern << "startRangeDeletion"))); + ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kShardKey << "startRangeDeletion"))); // 3 gone - ASSERT_EQUALS(5ULL, dbclient.count(kNss.ns(), BSON(kPattern << LT << 7))); - ASSERT_EQUALS(2ULL, dbclient.count(kNss.ns(), BSON(kPattern << LT << 4))); + ASSERT_EQUALS(5ULL, dbclient.count(kNss.ns(), BSON(kShardKey << LT << 7))); + ASSERT_EQUALS(2ULL, dbclient.count(kNss.ns(), BSON(kShardKey << LT << 4))); ASSERT_FALSE(notifn1.ready()); // no trigger yet ASSERT_FALSE(notifn2.ready()); // no trigger yet @@ -315,11 +309,11 @@ TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) { ASSERT_EQUALS(*next2, Date_t{}); // still no op log entry, because not delayed - ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kPattern << "startRangeDeletion"))); + ASSERT_EQUALS(0ULL, dbclient.count(kAdminSysVer.ns(), BSON(kShardKey << "startRangeDeletion"))); // deleted 1, 5 left - ASSERT_EQUALS(2ULL, dbclient.count(kNss.ns(), BSON(kPattern << LT << 4))); - ASSERT_EQUALS(5ULL, dbclient.count(kNss.ns(), BSON(kPattern << LT << 10))); + ASSERT_EQUALS(2ULL, dbclient.count(kNss.ns(), BSON(kShardKey << LT << 4))); + ASSERT_EQUALS(5ULL, dbclient.count(kNss.ns(), BSON(kShardKey << LT << 10))); ASSERT_FALSE(notifn1.ready()); // no trigger yet ASSERT_FALSE(notifn2.ready()); // no trigger yet @@ -336,9 +330,9 @@ TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) { ASSERT_FALSE(notifn2.ready()); // no trigger yet // deleted 3, 3 left - ASSERT_EQUALS(3ULL, dbclient.count(kNss.ns(), BSON(kPattern << LT << 10))); + ASSERT_EQUALS(3ULL, dbclient.count(kNss.ns(), BSON(kShardKey << LT << 10))); - ASSERT_EQUALS(1ULL, dbclient.count(kAdminSysVer.ns(), BSON(kPattern << "startRangeDeletion"))); + ASSERT_EQUALS(1ULL, dbclient.count(kAdminSysVer.ns(), BSON(kShardKey << "startRangeDeletion"))); // clang-format off ASSERT_BSONOBJ_EQ( BSON("_id" << "startRangeDeletion" << "ns" << kNss.ns() @@ -364,7 +358,7 @@ TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) { // clang-format on // still 3 left - ASSERT_EQUALS(3ULL, dbclient.count(kNss.ns(), BSON(kPattern << LT << 10))); + ASSERT_EQUALS(3ULL, dbclient.count(kNss.ns(), BSON(kShardKey << LT << 10))); // delete the remaining documents auto next5 = next(rangeDeleter, 100); @@ -382,7 +376,7 @@ TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) { // clang-format on // all docs gone - ASSERT_EQUALS(0ULL, dbclient.count(kNss.ns(), BSON(kPattern << LT << 10))); + ASSERT_EQUALS(0ULL, dbclient.count(kNss.ns(), BSON(kShardKey << LT << 10))); // discover there are no more, pop range 2 auto next6 = next(rangeDeleter, 100); diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 3c8d4d6e8d6..d19955f6a3c 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -37,29 +37,16 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" #include "mongo/db/operation_context.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/shard_identity_rollback_notifier.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/type_shard_identity.h" -#include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" #include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/type_config_version.h" -#include "mongo/s/catalog/type_shard.h" -#include "mongo/s/catalog/type_shard_collection.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/catalog_cache_loader.h" -#include "mongo/s/cluster_identity_loader.h" -#include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -169,64 +156,6 @@ private: const auto getCollectionShardingStateMap = ServiceContext::declareDecoration<CollectionShardingStateMap>(); -/** - * Used to perform shard identity initialization once it is certain that the document is committed. - */ -class ShardIdentityLogOpHandler final : public RecoveryUnit::Change { -public: - ShardIdentityLogOpHandler(OperationContext* opCtx, ShardIdentityType shardIdentity) - : _opCtx(opCtx), _shardIdentity(std::move(shardIdentity)) {} - - void commit() override { - fassertNoTrace( - 40071, ShardingState::get(_opCtx)->initializeFromShardIdentity(_opCtx, _shardIdentity)); - } - - void rollback() override {} - -private: - OperationContext* _opCtx; - const ShardIdentityType _shardIdentity; -}; - -/** - * Used to notify the catalog cache loader of a new collection version and invalidate the in-memory - * routing table cache once the oplog updates are committed and become visible. - */ -class CollectionVersionLogOpHandler final : public RecoveryUnit::Change { -public: - CollectionVersionLogOpHandler(OperationContext* opCtx, const NamespaceString& nss) - : _opCtx(opCtx), _nss(nss) {} - - void commit() override { - invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - - CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss); - - // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the X - // lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until - // SERVER-31595 removes the X lock requirement. - CollectionShardingState::get(_opCtx, _nss)->markNotShardedAtStepdown(); - } - - void rollback() override {} - -private: - OperationContext* _opCtx; - const NamespaceString _nss; -}; - -/** - * Caller must hold the global lock in some mode other than MODE_NONE. - */ -bool isStandaloneOrPrimary(OperationContext* opCtx) { - dassert(opCtx->lockState()->isLocked()); - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - return !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() == - repl::MemberState::RS_PRIMARY); -} - } // namespace CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss) @@ -394,24 +323,6 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, const repl::OpTime& opTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (_nss == NamespaceString::kServerConfigurationNamespace) { - if (auto idElem = insertedDoc["_id"]) { - if (idElem.str() == ShardIdentityType::IdName) { - auto shardIdentityDoc = - uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc)); - uassertStatusOK(shardIdentityDoc.validate()); - opCtx->recoveryUnit()->registerChange( - new ShardIdentityLogOpHandler(opCtx, std::move(shardIdentityDoc))); - } - } - } - - if (ShardingState::get(opCtx)->enabled()) { - _incrementChunkOnInsertOrUpdate(opCtx, insertedDoc, insertedDoc.objsize()); - } - } - checkShardVersionOrThrow(opCtx); if (_sourceMgr) { @@ -420,23 +331,11 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, } void CollectionShardingState::onUpdateOp(OperationContext* opCtx, - const BSONObj& query, - const BSONObj& update, const BSONObj& updatedDoc, const repl::OpTime& opTime, const repl::OpTime& prePostImageOpTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (_nss.ns() == NamespaceString::kShardConfigCollectionsCollectionName) { - _onConfigCollectionsUpdateOp(opCtx, query, update, updatedDoc); - } - - if (ShardingState::get(opCtx)->enabled()) { - _incrementChunkOnInsertOrUpdate(opCtx, updatedDoc, update.objsize()); - } - } - checkShardVersionOrThrow(opCtx); if (_sourceMgr) { @@ -455,41 +354,6 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const repl::OpTime& preImageOpTime) { dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { - if (_nss.ns() == NamespaceString::kShardConfigCollectionsCollectionName) { - _onConfigDeleteInvalidateCachedMetadataAndNotify(opCtx, deleteState.documentKey); - } - - if (_nss == NamespaceString::kServerConfigurationNamespace) { - if (auto idElem = deleteState.documentKey["_id"]) { - auto idStr = idElem.str(); - if (idStr == ShardIdentityType::IdName) { - if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { - uasserted(40070, - "cannot delete shardIdentity document while in --shardsvr mode"); - } else { - warning() << "Shard identity document rolled back. Will shut down after " - "finishing rollback."; - ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened(); - } - } - } - } - } - - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - if (_nss == VersionType::ConfigNS) { - if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { - uasserted(40302, "cannot delete config.version document while in --configsvr mode"); - } else { - // Throw out any cached information related to the cluster ID. - ShardingCatalogManager::get(opCtx) - ->discardCachedConfigDatabaseInitializationState(); - ClusterIdentityLoader::get(opCtx)->discardCachedClusterId(); - } - } - } - checkShardVersionOrThrow(opCtx); if (_sourceMgr && deleteState.isMigrating) { @@ -497,100 +361,6 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, } } -void CollectionShardingState::onDropCollection(OperationContext* opCtx, - const NamespaceString& collectionName) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - - if (serverGlobalParams.clusterRole == ClusterRole::ShardServer && - _nss == NamespaceString::kServerConfigurationNamespace) { - // Dropping system collections is not allowed for end users. - invariant(!opCtx->writesAreReplicated()); - invariant(repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()); - - // Can't confirm whether there was a ShardIdentity document or not yet, so assume there was - // one and shut down the process to clear the in-memory sharding state. - warning() << "admin.system.version collection rolled back. Will shut down after " - "finishing rollback"; - ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened(); - } - - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - if (_nss == VersionType::ConfigNS) { - if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { - uasserted(40303, "cannot drop config.version document while in --configsvr mode"); - } else { - // Throw out any cached information related to the cluster ID. - ShardingCatalogManager::get(opCtx) - ->discardCachedConfigDatabaseInitializationState(); - ClusterIdentityLoader::get(opCtx)->discardCachedClusterId(); - } - } - } -} - -void CollectionShardingState::_onConfigCollectionsUpdateOp(OperationContext* opCtx, - const BSONObj& query, - const BSONObj& update, - const BSONObj& updatedDoc) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); - - // Notification of routing table changes are only needed on secondaries. - if (isStandaloneOrPrimary(opCtx)) { - return; - } - - // Extract which user collection was updated. - std::string updatedCollection; - fassert(40477, - bsonExtractStringField(query, ShardCollectionType::ns.name(), &updatedCollection)); - - // Parse the '$set' update. - BSONElement setElement; - Status setStatus = bsonExtractTypedField(update, StringData("$set"), Object, &setElement); - if (setStatus.isOK()) { - BSONObj setField = setElement.Obj(); - const NamespaceString updatedNss(updatedCollection); - - // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit(). - AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX); - - if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) { - opCtx->recoveryUnit()->registerChange( - new CollectionVersionLogOpHandler(opCtx, updatedNss)); - } - - if (setField.hasField(ShardCollectionType::enterCriticalSectionCounter.name())) { - // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the - // X lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until - // SERVER-31595 removes the X lock requirement. - CollectionShardingState::get(opCtx, updatedNss)->markNotShardedAtStepdown(); - } - } -} - -void CollectionShardingState::_onConfigDeleteInvalidateCachedMetadataAndNotify( - OperationContext* opCtx, const BSONObj& query) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); - - // Notification of routing table changes are only needed on secondaries. - if (isStandaloneOrPrimary(opCtx)) { - return; - } - - // Extract which collection entry is being deleted from the _id field. - std::string deletedCollection; - fassert(40479, - bsonExtractStringField(query, ShardCollectionType::ns.name(), &deletedCollection)); - const NamespaceString deletedNss(deletedCollection); - - // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit(). - AutoGetCollection autoColl(opCtx, deletedNss, MODE_IX); - - opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler(opCtx, deletedNss)); -} - bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, std::string* errmsg, ChunkVersion* expectedShardVersion, @@ -681,58 +451,4 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, MONGO_UNREACHABLE; } -uint64_t CollectionShardingState::_incrementChunkOnInsertOrUpdate(OperationContext* opCtx, - const BSONObj& document, - long dataWritten) { - - // Here, get the collection metadata and check if it exists. If it doesn't exist, then the - // collection is not sharded, and we can simply return -1. - ScopedCollectionMetadata metadata = getMetadata(); - if (!metadata) { - return -1; - } - - std::shared_ptr<ChunkManager> cm = metadata->getChunkManager(); - const ShardKeyPattern& shardKeyPattern = cm->getShardKeyPattern(); - - // Each inserted/updated document should contain the shard key. The only instance in which a - // document could not contain a shard key is if the insert/update is performed through mongod - // explicitly, as opposed to first routed through mongos. - BSONObj shardKey = shardKeyPattern.extractShardKeyFromDoc(document); - if (shardKey.woCompare(BSONObj()) == 0) { - warning() << "inserting document " << document.toString() << " without shard key pattern " - << shardKeyPattern << " into a sharded collection"; - return -1; - } - - // Use the shard key to locate the chunk into which the document was updated, and increment the - // number of bytes tracked for the chunk. Note that we can assume the simple collation, because - // shard keys do not support non-simple collations. - auto chunk = cm->findIntersectingChunkWithSimpleCollation(shardKey); - chunk->addBytesWritten(dataWritten); - - // If the chunk becomes too large, then we call the ChunkSplitter to schedule a split. Then, we - // reset the tracking for that chunk to 0. - if (_shouldSplitChunk(opCtx, shardKeyPattern, *chunk)) { - // TODO: call ChunkSplitter here - chunk->clearBytesWritten(); - } - - return chunk->getBytesWritten(); -} - -bool CollectionShardingState::_shouldSplitChunk(OperationContext* opCtx, - const ShardKeyPattern& shardKeyPattern, - const Chunk& chunk) { - - const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); - invariant(balancerConfig); - - const KeyPattern keyPattern = shardKeyPattern.getKeyPattern(); - const bool minIsInf = (0 == keyPattern.globalMin().woCompare(chunk.getMin())); - const bool maxIsInf = (0 == keyPattern.globalMax().woCompare(chunk.getMax())); - - return chunk.shouldSplit(balancerConfig->getMaxChunkSizeBytes(), minIsInf, maxIsInf); -} - } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 30ba8a295d0..14b403cd93f 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -32,21 +32,14 @@ #include <string> #include "mongo/base/disallow_copying.h" -#include "mongo/base/string_data.h" +#include "mongo/bson/bsonobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/metadata_manager.h" -#include "mongo/util/concurrency/notification.h" namespace mongo { -class BalancerConfiguration; -class BSONObj; -class BSONObjBuilder; -struct ChunkVersion; -class CollectionMetadata; class MigrationSourceManager; class OperationContext; -class Timestamp; /** * Contains all sharding-related runtime state for a given collection. One such object is assigned @@ -229,8 +222,6 @@ public: const BSONObj& insertedDoc, const repl::OpTime& opTime); void onUpdateOp(OperationContext* opCtx, - const BSONObj& query, - const BSONObj& update, const BSONObj& updatedDoc, const repl::OpTime& opTime, const repl::OpTime& prePostImageOpTime); @@ -238,52 +229,9 @@ public: const DeleteState& deleteState, const repl::OpTime& opTime, const repl::OpTime& preImageOpTime); - void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName); private: /** - * This runs on updates to the shard's persisted cache of the config server's - * config.collections collection. - * - * If an update occurs to the 'lastRefreshedCollectionVersion' field, registers a task on the - * opCtx -- to run after writes from the oplog are committed and visible to reads -- to notify - * the catalog cache loader of a new collection version and clear the routing table so the next - * caller with routing information will provoke a routing table refresh. When - * 'lastRefreshedCollectionVersion' is in 'update', it means that a chunk metadata refresh - * finished being applied to the collection's locally persisted metadata store. - * - * If an update occurs to the 'enterCriticalSectionSignal' field, simply clear the routing table - * immediately. This will provoke the next secondary caller to refresh through the primary, - * blocking behind the critical section. - * - * query - BSON with an _id that identifies which collections entry is being updated. - * update - the update being applied to the collections entry. - * updatedDoc - the document identified by 'query' with the 'update' applied. - * - * This only runs on secondaries. - * The global exclusive lock is expected to be held by the caller. - */ - void _onConfigCollectionsUpdateOp(OperationContext* opCtx, - const BSONObj& query, - const BSONObj& update, - const BSONObj& updatedDoc); - - /** - * Invalidates the in-memory routing table cache when a collection is dropped, so the next - * caller with routing information will provoke a routing table refresh and see the drop. - * - * Registers a task on the opCtx -- to run after writes from the oplog are committed and visible - * to reads. - * - * query - BSON with an _id field that identifies which collections entry is being updated. - * - * This only runs on secondaries. - * The global exclusive lock is expected to be held by the caller. - */ - void _onConfigDeleteInvalidateCachedMetadataAndNotify(OperationContext* opCtx, - const BSONObj& query); - - /** * Checks whether the shard version of the operation matches that of the collection. * * opCtx - Operation context from which to retrieve the operation's expected version. @@ -301,23 +249,6 @@ private: ChunkVersion* expectedShardVersion, ChunkVersion* actualShardVersion); - /** - * If the collection is sharded, finds the chunk that contains the specified document, and - * increments the size tracked for that chunk by the specified amount of data written, in - * bytes. Returns the number of total bytes on that chunk, after the data is written. - */ - uint64_t _incrementChunkOnInsertOrUpdate(OperationContext* opCtx, - const BSONObj& document, - long dataWritten); - - /** - * Returns true if the total number of bytes on the specified chunk nears the max size of - * a shard. - */ - bool _shouldSplitChunk(OperationContext* opCtx, - const ShardKeyPattern& shardKeyPattern, - const Chunk& chunk); - // Namespace this state belongs to. const NamespaceString _nss; diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index ee083fec502..d50d0b10d86 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -28,6 +28,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" @@ -39,7 +40,7 @@ namespace { const NamespaceString kTestNss("TestDB", "TestColl"); -class CollShardingStateTest : public ShardServerTestFixture { +class CollectionShardingStateTest : public ShardServerTestFixture { public: void setUp() override { ShardServerTestFixture::setUp(); @@ -62,106 +63,63 @@ private: int _initCallCount = 0; }; -TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { - // Must hold a lock to call initializeFromShardIdentity, which is called by the op observer on - // the shard identity document. - Lock::GlobalWrite lock(operationContext()); - - CollectionShardingState collShardingState(getServiceContext(), - NamespaceString::kServerConfigurationNamespace); - +TEST_F(CollectionShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); - - ASSERT_EQ(0, getInitCallCount()); - - wuow.commit(); - + DBDirectClient client(operationContext()); + client.insert("admin.system.version", shardIdentity.toBSON()); ASSERT_EQ(1, getInitCallCount()); } -TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { - // Must hold a lock to call initializeFromShardIdentity, which is called by the op observer on - // the shard identity document. - Lock::GlobalWrite lock(operationContext()); - - CollectionShardingState collShardingState(getServiceContext(), - NamespaceString::kServerConfigurationNamespace); - +TEST_F(CollectionShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); + // This part of the test ensures that the collection exists for the AutoGetCollection below to + // find and also validates that the initializer does not get called for non-sharding documents + DBDirectClient client(operationContext()); + client.insert("admin.system.version", BSON("_id" << 1)); + ASSERT_EQ(0, getInitCallCount()); + { - WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); + AutoGetCollection autoColl( + operationContext(), NamespaceString("admin.system.version"), MODE_IX); + WriteUnitOfWork wuow(operationContext()); + ASSERT_OK(autoColl.getCollection()->insertDocument( + operationContext(), shardIdentity.toBSON(), {}, false)); ASSERT_EQ(0, getInitCallCount()); } ASSERT_EQ(0, getInitCallCount()); } -TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentity) { - // Must hold a lock to call initializeFromShardIdentity, which is called by the op observer on - // the shard identity document. - Lock::GlobalWrite lock(operationContext()); - - CollectionShardingState collShardingState(getServiceContext(), NamespaceString("admin.user")); - +TEST_F(CollectionShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentity) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}); - - ASSERT_EQ(0, getInitCallCount()); - - wuow.commit(); - + DBDirectClient client(operationContext()); + client.insert("admin.user", shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); } -TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument) { - // Must hold a lock to call CollectionShardingState::onInsertOp. - Lock::GlobalWrite lock(operationContext()); - - CollectionShardingState collShardingState(getServiceContext(), - NamespaceString::kServerConfigurationNamespace); - +TEST_F(CollectionShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument) { ShardIdentityType shardIdentity; shardIdentity.setShardName("a"); - ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}), - AssertionException); -} - -TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNotInserted) { - // Must hold a lock to call CollectionShardingState::onInsertOp. - Lock::GlobalWrite lock(operationContext()); - - CollectionShardingState collShardingState(getServiceContext(), - NamespaceString::kServerConfigurationNamespace); - - WriteUnitOfWork wuow(operationContext()); - collShardingState.onInsertOp(operationContext(), BSON("_id" << 1), {}); - - ASSERT_EQ(0, getInitCallCount()); - - wuow.commit(); - - ASSERT_EQ(0, getInitCallCount()); + DBDirectClient client(operationContext()); + client.insert("admin.system.version", shardIdentity.toBSON()); + ASSERT(!client.getLastError().empty()); } /** @@ -170,7 +128,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNot * that DeleteState's constructor will extract from its `doc` argument into its member * DeleteState::documentKey. */ -auto makeAMetadata(BSONObj const& keyPattern) -> std::unique_ptr<CollectionMetadata> { +std::unique_ptr<CollectionMetadata> makeAMetadata(BSONObj const& keyPattern) { const OID epoch = OID::gen(); auto range = ChunkRange(BSON("key" << MINKEY), BSON("key" << MAXKEY)); auto chunk = ChunkType(kTestNss, std::move(range), ChunkVersion(1, 0, epoch), ShardId("other")); @@ -179,7 +137,9 @@ auto makeAMetadata(BSONObj const& keyPattern) -> std::unique_ptr<CollectionMetad return stdx::make_unique<CollectionMetadata>(std::move(cm), ShardId("this")); } -TEST_F(CollShardingStateTest, MakeDeleteStateUnsharded) { +using DeleteStateTest = ShardServerTestFixture; + +TEST_F(DeleteStateTest, MakeDeleteStateUnsharded) { AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); auto* css = CollectionShardingState::get(operationContext(), kTestNss); @@ -201,7 +161,7 @@ TEST_F(CollShardingStateTest, MakeDeleteStateUnsharded) { ASSERT_FALSE(deleteState.isMigrating); } -TEST_F(CollShardingStateTest, MakeDeleteStateShardedWithoutIdInShardKey) { +TEST_F(DeleteStateTest, MakeDeleteStateShardedWithoutIdInShardKey) { AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); auto* css = CollectionShardingState::get(operationContext(), kTestNss); @@ -228,7 +188,7 @@ TEST_F(CollShardingStateTest, MakeDeleteStateShardedWithoutIdInShardKey) { ASSERT_FALSE(deleteState.isMigrating); } -TEST_F(CollShardingStateTest, MakeDeleteStateShardedWithIdInShardKey) { +TEST_F(DeleteStateTest, MakeDeleteStateShardedWithIdInShardKey) { AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); auto* css = CollectionShardingState::get(operationContext(), kTestNss); @@ -254,7 +214,7 @@ TEST_F(CollShardingStateTest, MakeDeleteStateShardedWithIdInShardKey) { ASSERT_FALSE(deleteState.isMigrating); } -TEST_F(CollShardingStateTest, MakeDeleteStateShardedWithIdHashInShardKey) { +TEST_F(DeleteStateTest, MakeDeleteStateShardedWithIdHashInShardKey) { AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); auto* css = CollectionShardingState::get(operationContext(), kTestNss); diff --git a/src/mongo/db/s/config_server_op_observer.cpp b/src/mongo/db/s/config_server_op_observer.cpp new file mode 100644 index 00000000000..6b52c94ef37 --- /dev/null +++ b/src/mongo/db/s/config_server_op_observer.cpp @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/config_server_op_observer.h" + +#include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/s/catalog/type_config_version.h" +#include "mongo/s/cluster_identity_loader.h" + +namespace mongo { + +ConfigServerOpObserver::ConfigServerOpObserver() = default; + +ConfigServerOpObserver::~ConfigServerOpObserver() = default; + +void ConfigServerOpObserver::onDelete(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + StmtId stmtId, + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) { + if (nss == VersionType::ConfigNS) { + if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { + uasserted(40302, "cannot delete config.version document while in --configsvr mode"); + } else { + // Throw out any cached information related to the cluster ID. + ShardingCatalogManager::get(opCtx)->discardCachedConfigDatabaseInitializationState(); + ClusterIdentityLoader::get(opCtx)->discardCachedClusterId(); + } + } +} + +repl::OpTime ConfigServerOpObserver::onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + OptionalCollectionUUID uuid) { + if (collectionName == VersionType::ConfigNS) { + if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { + uasserted(40303, "cannot drop config.version document while in --configsvr mode"); + } else { + // Throw out any cached information related to the cluster ID. + ShardingCatalogManager::get(opCtx)->discardCachedConfigDatabaseInitializationState(); + ClusterIdentityLoader::get(opCtx)->discardCachedClusterId(); + } + } + + return {}; +} + +} // namespace mongo diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h new file mode 100644 index 00000000000..6bea48f4a11 --- /dev/null +++ b/src/mongo/db/s/config_server_op_observer.h @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/op_observer.h" + +namespace mongo { + +/** + * OpObserver which is installed on the op observers chain when the server is running as a config + * server (--configsvr). + */ +class ConfigServerOpObserver final : public OpObserver { + MONGO_DISALLOW_COPYING(ConfigServerOpObserver); + +public: + ConfigServerOpObserver(); + ~ConfigServerOpObserver(); + + void onCreateIndex(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + BSONObj indexDoc, + bool fromMigrate) override {} + + void onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate) override {} + + void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override {} + + void aboutToDelete(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& doc) override {} + + void onDelete(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + StmtId stmtId, + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) override; + + void onInternalOpMessage(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<UUID> uuid, + const BSONObj& msgObj, + const boost::optional<BSONObj> o2MsgObj) override {} + + void onCreateCollection(OperationContext* opCtx, + Collection* coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex) override {} + + void onCollMod(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + const BSONObj& collModCmd, + const CollectionOptions& oldCollOptions, + boost::optional<TTLCollModInfo> ttlInfo) override {} + + void onDropDatabase(OperationContext* opCtx, const std::string& dbName) override {} + + repl::OpTime onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + OptionalCollectionUUID uuid) override; + + void onDropIndex(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + const std::string& indexName, + const BSONObj& indexInfo) override {} + + repl::OpTime onRenameCollection(OperationContext* opCtx, + const NamespaceString& fromCollection, + const NamespaceString& toCollection, + OptionalCollectionUUID uuid, + bool dropTarget, + OptionalCollectionUUID dropTargetUUID, + bool stayTemp) override { + return repl::OpTime(); + } + + void onApplyOps(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& applyOpCmd) override {} + + void onEmptyCapped(OperationContext* opCtx, + const NamespaceString& collectionName, + OptionalCollectionUUID uuid) override {} + + void onTransactionCommit(OperationContext* opCtx) override {} + + void onTransactionAbort(OperationContext* opCtx) override {} + + void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) {} +}; + +} // namespace mongo diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index 9c8e670dc2b..de7b0adce82 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -31,7 +31,6 @@ #include <boost/optional.hpp> #include "mongo/bson/bsonobjbuilder.h" -#include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" #include "mongo/db/dbdirectclient.h" @@ -42,11 +41,10 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" -#include "mongo/db/service_context_d_test_fixture.h" #include "mongo/executor/task_executor.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/sharding_mongod_test_fixture.h" +#include "mongo/s/shard_server_test_fixture.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" @@ -59,29 +57,17 @@ using unittest::assertGet; const NamespaceString kNss("TestDB", "TestColl"); const std::string kPattern = "key"; -const BSONObj kShardKeyPatternBSON{BSON(kPattern << 1)}; -const KeyPattern kShardKeyPattern{kShardKeyPatternBSON}; +const KeyPattern kShardKeyPattern(BSON(kPattern << 1)); const std::string kThisShard{"thisShard"}; const std::string kOtherShard{"otherShard"}; -const HostAndPort dummyHost("dummy", 123); -class MetadataManagerTest : public ShardingMongodTestFixture { +class MetadataManagerTest : public ShardServerTestFixture { protected: void setUp() override { - ShardingMongodTestFixture::setUp(); - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost)) - .transitional_ignore(); - - configTargeter()->setFindHostReturnValue(dummyHost); - + ShardServerTestFixture::setUp(); _manager = std::make_shared<MetadataManager>(getServiceContext(), kNss, executor()); } - std::shared_ptr<RemoteCommandTargeterMock> configTargeter() const { - return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); - } - static std::unique_ptr<CollectionMetadata> makeEmptyMetadata() { const OID epoch = OID::gen(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 34d91019d5e..49aec18b0fd 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -32,12 +32,11 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/sharding_mongod_test_fixture.h" +#include "mongo/s/shard_server_test_fixture.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -64,23 +63,17 @@ const ConnectionString kRecipientConnStr = HostAndPort("RecipientHost2:1234"), HostAndPort("RecipientHost3:1234")}); -class MigrationChunkClonerSourceLegacyTest : public ShardingMongodTestFixture { +class MigrationChunkClonerSourceLegacyTest : public ShardServerTestFixture { protected: void setUp() override { - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - ShardingMongodTestFixture::setUp(); + ShardServerTestFixture::setUp(); // TODO: SERVER-26919 set the flag on the mock repl coordinator just for the window where it // actually needs to bypass the op observer. replicationCoordinator()->alwaysAllowWrites(true); - ASSERT_OK(initializeGlobalShardingStateForMongodForTest(kConfigConnStr)); - _client.emplace(operationContext()); - RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()) - ->setConnectionStringReturnValue(kConfigConnStr); - { auto donorShard = assertGet( shardRegistry()->getShard(operationContext(), kDonorConnStr.getSetName())); @@ -103,7 +96,7 @@ protected: void tearDown() override { _client.reset(); - ShardingMongodTestFixture::tearDown(); + ShardServerTestFixture::tearDown(); } /** diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 698f0464029..f1a2b422305 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -49,13 +49,12 @@ #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/sharding_mongod_test_fixture.h" +#include "mongo/s/shard_server_test_fixture.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" namespace mongo { - namespace { using executor::RemoteCommandRequest; @@ -113,16 +112,12 @@ repl::OplogEntry extractInnerOplog(const repl::OplogEntry& oplog) { return oplogStatus.getValue(); } -class SessionCatalogMigrationDestinationTest : public ShardingMongodTestFixture { +class SessionCatalogMigrationDestinationTest : public ShardServerTestFixture { public: void setUp() override { - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - ShardingMongodTestFixture::setUp(); - - ASSERT_OK(initializeGlobalShardingStateForMongodForTest(kConfigConnStr)); + ShardServerTestFixture::setUp(); - RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()) - ->setConnectionStringReturnValue(kConfigConnStr); + _migrationId = MigrationSessionId::generate("donor", "recipient"); { auto donorShard = assertGet( @@ -133,8 +128,6 @@ public: ->setFindHostReturnValue(kDonorConnStr.getServers()[0]); } - _migrationId = MigrationSessionId::generate("donor", "recipient"); - SessionCatalog::create(getServiceContext()); SessionCatalog::get(getServiceContext())->onStepUp(operationContext()); LogicalSessionCache::set(getServiceContext(), stdx::make_unique<LogicalSessionCacheNoop>()); @@ -142,7 +135,7 @@ public: void tearDown() override { SessionCatalog::reset_forTest(getServiceContext()); - ShardingMongodTestFixture::tearDown(); + ShardServerTestFixture::tearDown(); } void returnOplog(const std::vector<OplogEntry>& oplogList) { diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp new file mode 100644 index 00000000000..6c0d678d85f --- /dev/null +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -0,0 +1,348 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/shard_server_op_observer.h" + +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/shard_identity_rollback_notifier.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/type_shard_identity.h" +#include "mongo/s/balancer_configuration.h" +#include "mongo/s/catalog/type_shard_collection.h" +#include "mongo/s/catalog_cache_loader.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +using DeleteState = CollectionShardingState::DeleteState; + +const OperationContext::Decoration<DeleteState> getDeleteState = + OperationContext::declareDecoration<DeleteState>(); + +bool isStandaloneOrPrimary(OperationContext* opCtx) { + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + const bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + return !isReplSet || (repl::ReplicationCoordinator::get(opCtx)->getMemberState() == + repl::MemberState::RS_PRIMARY); +} + +/** + * Used to notify the catalog cache loader of a new collection version and invalidate the in-memory + * routing table cache once the oplog updates are committed and become visible. + */ +class CollectionVersionLogOpHandler final : public RecoveryUnit::Change { +public: + CollectionVersionLogOpHandler(OperationContext* opCtx, const NamespaceString& nss) + : _opCtx(opCtx), _nss(nss) {} + + void commit() override { + invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + + CatalogCacheLoader::get(_opCtx).notifyOfCollectionVersionUpdate(_nss); + + // This is a hack to get around CollectionShardingState::refreshMetadata() requiring the X + // lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary measure until + // SERVER-31595 removes the X lock requirement. + CollectionShardingState::get(_opCtx, _nss)->markNotShardedAtStepdown(); + } + + void rollback() override {} + +private: + OperationContext* _opCtx; + const NamespaceString _nss; +}; + +/** + * Used to perform shard identity initialization once it is certain that the document is committed. + */ +class ShardIdentityLogOpHandler final : public RecoveryUnit::Change { +public: + ShardIdentityLogOpHandler(OperationContext* opCtx, ShardIdentityType shardIdentity) + : _opCtx(opCtx), _shardIdentity(std::move(shardIdentity)) {} + + void commit() override { + fassertNoTrace( + 40071, ShardingState::get(_opCtx)->initializeFromShardIdentity(_opCtx, _shardIdentity)); + } + + void rollback() override {} + +private: + OperationContext* _opCtx; + const ShardIdentityType _shardIdentity; +}; + +/** + * Invalidates the in-memory routing table cache when a collection is dropped, so the next caller + * with routing information will provoke a routing table refresh and see the drop. + * + * The query parameter must contain an _id field that identifies which collections entry is being + * updated. + * + * This only runs on secondaries. + * The global exclusive lock is expected to be held by the caller. + */ +void onConfigDeleteInvalidateCachedMetadataAndNotify(OperationContext* opCtx, + const BSONObj& query) { + // Notification of routing table changes are only needed on secondaries + if (isStandaloneOrPrimary(opCtx)) { + return; + } + + // Extract which collection entry is being deleted from the _id field. + std::string deletedCollection; + fassert(40479, + bsonExtractStringField(query, ShardCollectionType::ns.name(), &deletedCollection)); + const NamespaceString deletedNss(deletedCollection); + + // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit(). + AutoGetCollection autoColl(opCtx, deletedNss, MODE_IX); + + opCtx->recoveryUnit()->registerChange(new CollectionVersionLogOpHandler(opCtx, deletedNss)); +} + +/** + * Returns true if the total number of bytes on the specified chunk nears the max size of a shard. + */ +bool shouldSplitChunk(OperationContext* opCtx, + const ShardKeyPattern& shardKeyPattern, + const Chunk& chunk) { + const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); + invariant(balancerConfig); + + const auto& keyPattern = shardKeyPattern.getKeyPattern(); + const bool minIsInf = (0 == keyPattern.globalMin().woCompare(chunk.getMin())); + const bool maxIsInf = (0 == keyPattern.globalMax().woCompare(chunk.getMax())); + + return chunk.shouldSplit(balancerConfig->getMaxChunkSizeBytes(), minIsInf, maxIsInf); +} + +/** + * If the collection is sharded, finds the chunk that contains the specified document and increments + * the size tracked for that chunk by the specified amount of data written, in bytes. Returns the + * number of total bytes on that chunk after the data is written. + */ +void incrementChunkOnInsertOrUpdate(OperationContext* opCtx, + const ChunkManager& chunkManager, + const BSONObj& document, + long dataWritten) { + const auto& shardKeyPattern = chunkManager.getShardKeyPattern(); + + // Each inserted/updated document should contain the shard key. The only instance in which a + // document could not contain a shard key is if the insert/update is performed through mongod + // explicitly, as opposed to first routed through mongos. + BSONObj shardKey = shardKeyPattern.extractShardKeyFromDoc(document); + if (shardKey.woCompare(BSONObj()) == 0) { + warning() << "inserting document " << document.toString() << " without shard key pattern " + << shardKeyPattern << " into a sharded collection"; + return; + } + + // Use the shard key to locate the chunk into which the document was updated, and increment the + // number of bytes tracked for the chunk. + // + // Note that we can assume the simple collation, because shard keys do not support non-simple + // collations. + auto chunk = chunkManager.findIntersectingChunkWithSimpleCollation(shardKey); + chunk->addBytesWritten(dataWritten); + + // If the chunk becomes too large, then we call the ChunkSplitter to schedule a split. Then, we + // reset the tracking for that chunk to 0. + if (shouldSplitChunk(opCtx, shardKeyPattern, *chunk)) { + // TODO: call ChunkSplitter here + chunk->clearBytesWritten(); + } +} + +} // namespace + +ShardServerOpObserver::ShardServerOpObserver() = default; + +ShardServerOpObserver::~ShardServerOpObserver() = default; + +void ShardServerOpObserver::onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate) { + auto const css = CollectionShardingState::get(opCtx, nss); + const auto metadata = css->getMetadata(); + + for (auto it = begin; it != end; ++it) { + const auto& insertedDoc = it->doc; + + if (nss == NamespaceString::kServerConfigurationNamespace) { + if (auto idElem = insertedDoc["_id"]) { + if (idElem.str() == ShardIdentityType::IdName) { + auto shardIdentityDoc = + uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc)); + uassertStatusOK(shardIdentityDoc.validate()); + opCtx->recoveryUnit()->registerChange( + new ShardIdentityLogOpHandler(opCtx, std::move(shardIdentityDoc))); + } + } + } + + if (metadata) { + incrementChunkOnInsertOrUpdate( + opCtx, *metadata->getChunkManager(), insertedDoc, insertedDoc.objsize()); + } + } +} + +void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { + auto const css = CollectionShardingState::get(opCtx, args.nss); + const auto metadata = css->getMetadata(); + + if (args.nss.ns() == NamespaceString::kShardConfigCollectionsCollectionName) { + // Notification of routing table changes are only needed on secondaries + if (isStandaloneOrPrimary(opCtx)) { + return; + } + + // This logic runs on updates to the shard's persisted cache of the config server's + // config.collections collection. + // + // If an update occurs to the 'lastRefreshedCollectionVersion' field it notifies the catalog + // cache loader of a new collection version and clears the routing table so the next caller + // with routing information will provoke a routing table refresh. + // + // When 'lastRefreshedCollectionVersion' is in 'update', it means that a chunk metadata + // refresh has finished being applied to the collection's locally persisted metadata store. + // + // If an update occurs to the 'enterCriticalSectionSignal' field, simply clear the routing + // table immediately. This will provoke the next secondary caller to refresh through the + // primary, blocking behind the critical section. + + // Extract which user collection was updated + const auto updatedNss([&] { + std::string coll; + fassertStatusOK( + 40477, + bsonExtractStringField(args.criteria, ShardCollectionType::ns.name(), &coll)); + return NamespaceString(coll); + }()); + + // Parse the '$set' update + BSONElement setElement; + Status setStatus = + bsonExtractTypedField(args.update, StringData("$set"), Object, &setElement); + if (setStatus.isOK()) { + BSONObj setField = setElement.Obj(); + + // Need the WUOW to retain the lock for CollectionVersionLogOpHandler::commit() + AutoGetCollection autoColl(opCtx, updatedNss, MODE_IX); + + if (setField.hasField(ShardCollectionType::lastRefreshedCollectionVersion.name())) { + opCtx->recoveryUnit()->registerChange( + new CollectionVersionLogOpHandler(opCtx, updatedNss)); + } + + if (setField.hasField(ShardCollectionType::enterCriticalSectionCounter.name())) { + // This is a hack to get around CollectionShardingState::refreshMetadata() requiring + // the X lock: markNotShardedAtStepdown() doesn't have a lock check. Temporary + // measure until SERVER-31595 removes the X lock requirement. + CollectionShardingState::get(opCtx, updatedNss)->markNotShardedAtStepdown(); + } + } + } + + if (metadata) { + incrementChunkOnInsertOrUpdate( + opCtx, *metadata->getChunkManager(), args.updatedDoc, args.updatedDoc.objsize()); + } +} + +void ShardServerOpObserver::aboutToDelete(OperationContext* opCtx, + NamespaceString const& nss, + BSONObj const& doc) { + auto& deleteState = getDeleteState(opCtx); + auto* css = CollectionShardingState::get(opCtx, nss.ns()); + deleteState = css->makeDeleteState(doc); +} + +void ShardServerOpObserver::onDelete(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + StmtId stmtId, + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) { + auto& deleteState = getDeleteState(opCtx); + + if (nss.ns() == NamespaceString::kShardConfigCollectionsCollectionName) { + onConfigDeleteInvalidateCachedMetadataAndNotify(opCtx, deleteState.documentKey); + } + + if (nss == NamespaceString::kServerConfigurationNamespace) { + if (auto idElem = deleteState.documentKey["_id"]) { + auto idStr = idElem.str(); + if (idStr == ShardIdentityType::IdName) { + if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { + uasserted(40070, + "cannot delete shardIdentity document while in --shardsvr mode"); + } else { + warning() << "Shard identity document rolled back. Will shut down after " + "finishing rollback."; + ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened(); + } + } + } + } +} + +repl::OpTime ShardServerOpObserver::onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + OptionalCollectionUUID uuid) { + if (collectionName == NamespaceString::kServerConfigurationNamespace) { + // Dropping system collections is not allowed for end users + invariant(!opCtx->writesAreReplicated()); + invariant(repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()); + + // Can't confirm whether there was a ShardIdentity document or not yet, so assume there was + // one and shut down the process to clear the in-memory sharding state + warning() << "admin.system.version collection rolled back. Will shut down after finishing " + "rollback"; + + ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened(); + } + + return {}; +} + +} // namespace mongo diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h new file mode 100644 index 00000000000..d15a7266668 --- /dev/null +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/op_observer.h" + +namespace mongo { + +/** + * OpObserver which is installed on the op observers chain when the server is running as a shard + * server (--shardsvr). + */ +class ShardServerOpObserver final : public OpObserver { + MONGO_DISALLOW_COPYING(ShardServerOpObserver); + +public: + ShardServerOpObserver(); + ~ShardServerOpObserver(); + + void onCreateIndex(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + BSONObj indexDoc, + bool fromMigrate) override {} + + void onInserts(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + bool fromMigrate) override; + + void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override; + + void aboutToDelete(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& doc) override; + + void onDelete(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + StmtId stmtId, + bool fromMigrate, + const boost::optional<BSONObj>& deletedDoc) override; + + void onInternalOpMessage(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<UUID> uuid, + const BSONObj& msgObj, + const boost::optional<BSONObj> o2MsgObj) override {} + + void onCreateCollection(OperationContext* opCtx, + Collection* coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex) override {} + + void onCollMod(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + const BSONObj& collModCmd, + const CollectionOptions& oldCollOptions, + boost::optional<TTLCollModInfo> ttlInfo) override {} + + void onDropDatabase(OperationContext* opCtx, const std::string& dbName) override {} + + repl::OpTime onDropCollection(OperationContext* opCtx, + const NamespaceString& collectionName, + OptionalCollectionUUID uuid) override; + + void onDropIndex(OperationContext* opCtx, + const NamespaceString& nss, + OptionalCollectionUUID uuid, + const std::string& indexName, + const BSONObj& indexInfo) override {} + + repl::OpTime onRenameCollection(OperationContext* opCtx, + const NamespaceString& fromCollection, + const NamespaceString& toCollection, + OptionalCollectionUUID uuid, + bool dropTarget, + OptionalCollectionUUID dropTargetUUID, + bool stayTemp) override { + return repl::OpTime(); + } + + void onApplyOps(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& applyOpCmd) override {} + + void onEmptyCapped(OperationContext* opCtx, + const NamespaceString& collectionName, + OptionalCollectionUUID uuid) override {} + + void onTransactionCommit(OperationContext* opCtx) override {} + + void onTransactionAbort(OperationContext* opCtx) override {} + + void onReplicationRollback(OperationContext* opCtx, const RollbackObserverInfo& rbInfo) {} +}; + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index 370a792d716..1b29fe5ceae 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -29,48 +29,37 @@ #include "mongo/platform/basic.h" #include "mongo/client/remote_command_targeter_mock.h" -#include "mongo/client/replica_set_monitor.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/dbdirectclient.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/namespace_string.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/s/config_server_op_observer.h" #include "mongo/db/s/shard_server_catalog_cache_loader.h" +#include "mongo/db/s/shard_server_op_observer.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" -#include "mongo/db/service_context_noop.h" #include "mongo/db/storage/storage_options.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" -#include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/sharding_mongod_test_fixture.h" namespace mongo { +namespace { using executor::RemoteCommandRequest; -namespace { +const std::string kShardName("a"); class ShardingStateTest : public ShardingMongodTestFixture { -public: - ShardingState* shardingState() { - return &_shardingState; - } - - std::string shardName() const { - return _shardName.toString(); - } - protected: // Used to write to set up local collections before exercising server logic. std::unique_ptr<DBDirectClient> _dbDirectClient; void setUp() override { - _shardName = ShardId("a"); - serverGlobalParams.clusterRole = ClusterRole::None; ShardingMongodTestFixture::setUp(); @@ -88,8 +77,9 @@ protected: if (!status.isOK()) { return status; } - // Set the ConnectionString return value on the mock targeter so that later calls to - // the targeter's getConnString() return the appropriate value. + + // Set the ConnectionString return value on the mock targeter so that later calls to the + // targeter's getConnString() return the appropriate value auto configTargeter = RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter()); configTargeter->setConnectionStringReturnValue(configConnStr); @@ -104,12 +94,9 @@ protected: void tearDown() override { _dbDirectClient.reset(); - // Some test cases modify the readOnly value, but the teardown calls below depend on - // readOnly being false, so we reset the value here rather than in setUp(). + // Restore the defaults before calling tearDown storageGlobalParams.readOnly = false; - - // ShardingState initialize can modify ReplicaSetMonitor state. - ReplicaSetMonitor::cleanup(); + serverGlobalParams.overrideShardIdentity = BSONObj(); CatalogCacheLoader::clearForTests(getServiceContext()); @@ -127,27 +114,55 @@ protected: return stdx::make_unique<ShardingCatalogClientImpl>(std::move(distLockManager)); } - std::unique_ptr<CatalogCache> makeCatalogCache() override { - return stdx::make_unique<CatalogCache>(CatalogCacheLoader::get(getServiceContext())); + ShardingState* shardingState() { + return &_shardingState; } private: ShardingState _shardingState; - ShardId _shardName; +}; + +/** + * This class emulates the server being started as a standalone node for the scope for which it is + * used + */ +class ScopedSetStandaloneMode { +public: + ScopedSetStandaloneMode(ServiceContext* serviceContext) : _serviceContext(serviceContext) { + serverGlobalParams.clusterRole = ClusterRole::None; + _serviceContext->setOpObserver(stdx::make_unique<OpObserverRegistry>()); + } + + ~ScopedSetStandaloneMode() { + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + auto makeOpObserver = [&] { + auto opObserver = stdx::make_unique<OpObserverRegistry>(); + opObserver->addObserver(stdx::make_unique<OpObserverImpl>()); + opObserver->addObserver(stdx::make_unique<ConfigServerOpObserver>()); + opObserver->addObserver(stdx::make_unique<ShardServerOpObserver>()); + return opObserver; + }; + + _serviceContext->setOpObserver(makeOpObserver()); + } + +private: + ServiceContext* const _serviceContext; }; TEST_F(ShardingStateTest, ValidShardIdentitySucceeds) { + // Must hold a lock to call initializeFromShardIdentity. Lock::GlobalWrite lk(operationContext()); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); + ASSERT_EQ(kShardName, shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardRegistry()->getConfigServerConnectionString().toString()); } @@ -158,7 +173,7 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); shardingState()->setGlobalInitMethodForTest( @@ -196,7 +211,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -204,7 +219,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity2.setShardName(shardName()); + shardIdentity2.setShardName(kShardName); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( @@ -215,7 +230,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2)); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); + ASSERT_EQ(kShardName, shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardRegistry()->getConfigServerConnectionString().toString()); } @@ -227,7 +242,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(clusterID); ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity)); @@ -235,7 +250,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { ShardIdentityType shardIdentity2; shardIdentity2.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "b:2,c:3", "config")); - shardIdentity2.setShardName(shardName()); + shardIdentity2.setShardName(kShardName); shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( @@ -246,21 +261,18 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { ASSERT_OK(shardingState()->initializeFromShardIdentity(operationContext(), shardIdentity2)); ASSERT_TRUE(shardingState()->enabled()); - ASSERT_EQ(shardName(), shardingState()->getShardName()); + ASSERT_EQ(kShardName, shardingState()->getShardName()); ASSERT_EQ("config/a:1,b:2", shardRegistry()->getConfigServerConnectionString().toString()); } -// The below tests check for compatible startup parameters for --shardsvr, --overrideShardIdentity, -// and queryableBackup (readOnly) mode. +// The tests below check for different combinations of the compatible startup parameters for +// --shardsvr, --overrideShardIdentity, and queryableBackup (readOnly) mode // readOnly and --shardsvr TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndNoOverrideShardIdentity) { storageGlobalParams.readOnly = true; - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - serverGlobalParams.overrideShardIdentity = BSONObj(); - auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); @@ -269,7 +281,6 @@ TEST_F(ShardingStateTest, TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededReadOnlyAndShardServerAndInvalidOverrideShardIdentity) { storageGlobalParams.readOnly = true; - serverGlobalParams.clusterRole = ClusterRole::ShardServer; serverGlobalParams.overrideShardIdentity = BSON("_id" << "shardIdentity" << "configsvrConnectionString" @@ -287,7 +298,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON(); @@ -304,7 +315,6 @@ TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndNoOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.clusterRole = ClusterRole::None; - serverGlobalParams.overrideShardIdentity = BSONObj(); auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); @@ -317,11 +327,11 @@ TEST_F( InitializeShardingAwarenessIfNeededReadOnlyAndNotShardServerAndInvalidOverrideShardIdentity) { storageGlobalParams.readOnly = true; serverGlobalParams.clusterRole = ClusterRole::None; + serverGlobalParams.overrideShardIdentity = BSON("_id" << "shardIdentity" << "configsvrConnectionString" << "invalid"); - auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_EQUALS(ErrorCodes::InvalidOptions, swShardingInitialized.getStatus().code()); @@ -335,7 +345,7 @@ TEST_F(ShardingStateTest, ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON(); @@ -349,7 +359,6 @@ TEST_F(ShardingStateTest, TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndInvalidOverrideShardIdentity) { - storageGlobalParams.readOnly = false; serverGlobalParams.overrideShardIdentity = BSON("_id" << "shardIdentity" << "configsvrConnectionString" @@ -370,12 +379,10 @@ TEST_F(ShardingStateTest, TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndValidOverrideShardIdentity) { - storageGlobalParams.readOnly = false; - ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); + shardIdentity.setShardName(kShardName); shardIdentity.setClusterId(OID::gen()); ASSERT_OK(shardIdentity.validate()); serverGlobalParams.overrideShardIdentity = shardIdentity.toBSON(); @@ -397,9 +404,6 @@ TEST_F(ShardingStateTest, TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndNoShardIdentity) { - storageGlobalParams.readOnly = false; - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - serverGlobalParams.overrideShardIdentity = BSONObj(); auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_OK(swShardingInitialized); @@ -408,23 +412,19 @@ TEST_F(ShardingStateTest, TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndInvalidShardIdentity) { - - ASSERT_OK(replicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY)); - - // Insert the shardIdentity doc to disk before setting the clusterRole, since if the clusterRole - // is ShardServer, the OpObserver for inserts will prevent the insert from occurring, since the - // shardIdentity doc is invalid. - serverGlobalParams.clusterRole = ClusterRole::None; - BSONObj invalidShardIdentity = BSON("_id" - << "shardIdentity" - << "configsvrConnectionString" - << "invalid"); - _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), - invalidShardIdentity); - - storageGlobalParams.readOnly = false; - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - serverGlobalParams.overrideShardIdentity = BSONObj(); + // Insert the shardIdentity doc to disk while pretending that we are in "standalone" mode, + // otherwise OpObserver for inserts will prevent the insert from occurring because the + // shardIdentity doc is invalid + { + ScopedSetStandaloneMode standalone(getServiceContext()); + + BSONObj invalidShardIdentity = BSON("_id" + << "shardIdentity" + << "configsvrConnectionString" + << "invalid"); + _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), + invalidShardIdentity); + } auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); @@ -434,28 +434,25 @@ TEST_F(ShardingStateTest, TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndShardServerAndValidShardIdentity) { - - ASSERT_OK(replicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY)); - - // Insert the shardIdentity doc to disk before setting the clusterRole, since if the clusterRole - // is ShardServer, the OpObserver for inserts will trigger sharding initialization from the - // inserted doc. - serverGlobalParams.clusterRole = ClusterRole::None; - - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); - shardIdentity.setClusterId(OID::gen()); - ASSERT_OK(shardIdentity.validate()); - BSONObj validShardIdentity = shardIdentity.toBSON(); - - _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), - validShardIdentity); - - storageGlobalParams.readOnly = false; - serverGlobalParams.clusterRole = ClusterRole::ShardServer; - serverGlobalParams.overrideShardIdentity = BSONObj(); + // Insert the shardIdentity doc to disk while pretending that we are in "standalone" mode, + // otherwise OpObserver for inserts will prevent the insert from occurring because the + // shardIdentity doc is invalid + { + ScopedSetStandaloneMode standalone(getServiceContext()); + + BSONObj validShardIdentity = [&] { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(kShardName); + shardIdentity.setClusterId(OID::gen()); + ASSERT_OK(shardIdentity.validate()); + return shardIdentity.toBSON(); + }(); + + _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), + validShardIdentity); + } auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); @@ -467,9 +464,8 @@ TEST_F(ShardingStateTest, TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndNoShardIdentity) { - storageGlobalParams.readOnly = false; - serverGlobalParams.clusterRole = ClusterRole::None; - serverGlobalParams.overrideShardIdentity = BSONObj(); + ScopedSetStandaloneMode standalone(getServiceContext()); + auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_OK(swShardingInitialized); @@ -478,18 +474,16 @@ TEST_F(ShardingStateTest, TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndInvalidShardIdentity) { + ScopedSetStandaloneMode standalone(getServiceContext()); + _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), BSON("_id" << "shardIdentity" << "configsvrConnectionString" << "invalid")); - storageGlobalParams.readOnly = false; - serverGlobalParams.clusterRole = ClusterRole::None; - serverGlobalParams.overrideShardIdentity = BSONObj(); - - // The shardIdentity doc on disk, even if invalid, is ignored if ClusterRole is None. - // This is to allow fixing the shardIdentity doc by starting without --shardsvr. + // The shardIdentity doc on disk, even if invalid, is ignored if the ClusterRole is None. This + // is to allow fixing the shardIdentity doc by starting without --shardsvr. auto swShardingInitialized = shardingState()->initializeShardingAwarenessIfNeeded(operationContext()); ASSERT_OK(swShardingInitialized); @@ -498,17 +492,17 @@ TEST_F(ShardingStateTest, TEST_F(ShardingStateTest, InitializeShardingAwarenessIfNeededNotReadOnlyAndNotShardServerAndValidShardIdentity) { - storageGlobalParams.readOnly = false; - serverGlobalParams.clusterRole = ClusterRole::None; - serverGlobalParams.overrideShardIdentity = BSONObj(); - - ShardIdentityType shardIdentity; - shardIdentity.setConfigsvrConnString( - ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); - shardIdentity.setShardName(shardName()); - shardIdentity.setClusterId(OID::gen()); - ASSERT_OK(shardIdentity.validate()); - BSONObj validShardIdentity = shardIdentity.toBSON(); + ScopedSetStandaloneMode standalone(getServiceContext()); + + BSONObj validShardIdentity = [&] { + ShardIdentityType shardIdentity; + shardIdentity.setConfigsvrConnString( + ConnectionString(ConnectionString::SET, "a:1,b:2", "config")); + shardIdentity.setShardName(kShardName); + shardIdentity.setClusterId(OID::gen()); + ASSERT_OK(shardIdentity.validate()); + return shardIdentity.toBSON(); + }(); _dbDirectClient->insert(NamespaceString::kServerConfigurationNamespace.toString(), validShardIdentity); diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp index cc1401b6517..4f69e5eb7b9 100644 --- a/src/mongo/db/service_context_d_test_fixture.cpp +++ b/src/mongo/db/service_context_d_test_fixture.cpp @@ -41,8 +41,6 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/op_observer_noop.h" #include "mongo/db/op_observer_registry.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/service_context.h" #include "mongo/db/service_context_d.h" #include "mongo/db/storage/storage_options.h" #include "mongo/stdx/memory.h" diff --git a/src/mongo/db/service_context_d_test_fixture.h b/src/mongo/db/service_context_d_test_fixture.h index 402a1837724..35932b64814 100644 --- a/src/mongo/db/service_context_d_test_fixture.h +++ b/src/mongo/db/service_context_d_test_fixture.h @@ -28,13 +28,12 @@ #pragma once +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" #include "mongo/unittest/unittest.h" namespace mongo { -class ServiceContext; -class OperationContext; - /** * Test fixture class for tests that use either the "ephemeralForTest" or "devnull" storage engines. */ |