diff options
author | Sam Dunietz <sam.dunietz@10gen.com> | 2016-07-28 15:22:44 -0400 |
---|---|---|
committer | Sam Dunietz <sam.dunietz@10gen.com> | 2016-07-28 15:22:51 -0400 |
commit | 9a776eae4f669fdcfae94c41c0cbbea662d36c94 (patch) | |
tree | ae068722c8baf5e433b50d3ebe5bd688ccda8afe /src | |
parent | 35c6b03c67346433d481a0e1be7a49977997b030 (diff) | |
download | mongo-9a776eae4f669fdcfae94c41c0cbbea662d36c94.tar.gz |
SERVER-24367 Implement CollectionRangeDeleter task lifetime management
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/client.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/collection_range_deleter.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/s/collection_range_deleter.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state_test.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_manager.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_manager.h | 10 | ||||
-rw-r--r-- | src/mongo/db/s/metadata_manager_test.cpp | 232 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 25 | ||||
-rw-r--r-- | src/mongo/db/service_context_d_test_fixture.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/service_context_d_test_fixture.h | 7 |
15 files changed, 251 insertions, 131 deletions
diff --git a/src/mongo/db/client.cpp b/src/mongo/db/client.cpp index 2a665268811..6776239f036 100644 --- a/src/mongo/db/client.cpp +++ b/src/mongo/db/client.cpp @@ -153,7 +153,7 @@ ClientBasic* ClientBasic::getCurrent() { Client& cc() { Client* c = currentClient.getMake()->get(); - verify(c); + invariant(c); return *c; } diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 096e3902bed..f90546aeb94 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -138,7 +138,6 @@ private: void RSRollbackTest::setUp() { ServiceContextMongoDTest::setUp(); - Client::initThreadIfNotAlready(); _txn = cc().makeOperationContext(); _coordinator = new ReplicationCoordinatorRollbackMock(); @@ -158,6 +157,7 @@ void RSRollbackTest::tearDown() { invariant(mongo::dbHolder().closeAll(_txn.get(), unused, false)); } _txn.reset(); + ServiceContextMongoDTest::tearDown(); setGlobalReplicationCoordinator(nullptr); } diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 67300543db0..072bf231adf 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -167,6 +167,7 @@ env.CppUnitTest( LIBDEPS=[ '$BUILD_DIR/mongo/client/remote_command_targeter_mock', '$BUILD_DIR/mongo/db/serveronly', + '$BUILD_DIR/mongo/db/service_context_d_test_fixture', '$BUILD_DIR/mongo/executor/network_test_env', '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock', diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index f08a1608a42..ec9a16c209f 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -28,17 +28,34 @@ #include "mongo/platform/basic.h" +#include "mongo/db/client.h" #include "mongo/db/s/collection_range_deleter.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/executor/task_executor.h" +#include "mongo/util/scopeguard.h" namespace mongo { +using CallbackArgs = executor::TaskExecutor::CallbackArgs; + CollectionRangeDeleter::CollectionRangeDeleter(NamespaceString nss) : _nss(std::move(nss)) {} void CollectionRangeDeleter::run() { - // TODO: not implemented + Client::initThread(getThreadName().c_str()); + ON_BLOCK_EXIT([&] { Client::destroy(); }); + auto txn = cc().makeOperationContext().get(); + bool hasNextRangeToClean = cleanupNextRange(txn); + + // If there are more ranges to run, we add <this> back onto the task executor to run again. + if (hasNextRangeToClean) { + auto executor = ShardingState::get(txn)->getRangeDeleterTaskExecutor(); + executor->scheduleWork([this](const CallbackArgs& cbArgs) { run(); }); + } else { + delete this; + } } -bool CollectionRangeDeleter::cleanupNextRange() { +bool CollectionRangeDeleter::cleanupNextRange(OperationContext* txn) { // TODO: not implemented return false; } diff --git a/src/mongo/db/s/collection_range_deleter.h b/src/mongo/db/s/collection_range_deleter.h index c447bd750e2..cf3599c17ae 100644 --- a/src/mongo/db/s/collection_range_deleter.h +++ b/src/mongo/db/s/collection_range_deleter.h @@ -29,6 +29,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" namespace mongo { @@ -51,7 +52,7 @@ public: * * Returns true if there are more entries in rangesToClean, false if the set is empty. */ - bool cleanupNextRange(); + bool cleanupNextRange(OperationContext* txn); private: NamespaceString _nss; diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index ba36ff9e546..47e66ca914b 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -44,6 +44,7 @@ #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/db/server_options.h" +#include "mongo/db/service_context.h" #include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/chunk_version.h" @@ -129,8 +130,8 @@ private: } // unnamed namespace -CollectionShardingState::CollectionShardingState(NamespaceString nss) - : _nss(std::move(nss)), _metadataManager{} {} +CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss) + : _nss(std::move(nss)), _metadataManager{sc, _nss} {} CollectionShardingState::~CollectionShardingState() { invariant(!_sourceMgr); @@ -147,7 +148,7 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* txn, dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_IS)); ShardingState* const shardingState = ShardingState::get(txn); - return shardingState->getNS(ns); + return shardingState->getNS(ns, txn); } ScopedCollectionMetadata CollectionShardingState::getMetadata() { diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 24ed2cb4ffb..40ce2829e53 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -59,7 +59,7 @@ public: /** * Instantiates a new per-collection sharding state as unsharded. */ - CollectionShardingState(NamespaceString nss); + CollectionShardingState(ServiceContext* sc, NamespaceString nss); ~CollectionShardingState(); /** diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index 818ae80af3c..8a7ca715141 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -78,8 +78,14 @@ public: return _initCallCount; } -private: + ServiceContext* getServiceContext() { + return &_service; + } + +protected: ServiceContextNoop _service; + +private: ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; @@ -87,7 +93,8 @@ private: }; TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { - CollectionShardingState collShardingState(NamespaceString::kConfigCollectionNamespace); + CollectionShardingState collShardingState(&_service, + NamespaceString::kConfigCollectionNamespace); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( @@ -106,7 +113,8 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { } TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { - CollectionShardingState collShardingState(NamespaceString::kConfigCollectionNamespace); + CollectionShardingState collShardingState(getServiceContext(), + NamespaceString::kConfigCollectionNamespace); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( @@ -125,7 +133,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { } TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentity) { - CollectionShardingState collShardingState(NamespaceString("admin.user")); + CollectionShardingState collShardingState(getServiceContext(), NamespaceString("admin.user")); ShardIdentityType shardIdentity; shardIdentity.setConfigsvrConnString( @@ -144,7 +152,8 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit } TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument) { - CollectionShardingState collShardingState(NamespaceString::kConfigCollectionNamespace); + CollectionShardingState collShardingState(getServiceContext(), + NamespaceString::kConfigCollectionNamespace); ShardIdentityType shardIdentity; shardIdentity.setShardName("a"); @@ -153,7 +162,8 @@ TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument } TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNotInserted) { - CollectionShardingState collShardingState(NamespaceString::kConfigCollectionNamespace); + CollectionShardingState collShardingState(getServiceContext(), + NamespaceString::kConfigCollectionNamespace); WriteUnitOfWork wuow(txn()); collShardingState.onInsertOp(txn(), BSON("_id" << 1)); diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp index 84fb91e592f..d438f89f505 100644 --- a/src/mongo/db/s/metadata_manager.cpp +++ b/src/mongo/db/s/metadata_manager.cpp @@ -31,14 +31,20 @@ #include "mongo/platform/basic.h" #include "mongo/db/range_arithmetic.h" +#include "mongo/db/s/collection_range_deleter.h" #include "mongo/db/s/metadata_manager.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { -MetadataManager::MetadataManager() - : _activeMetadataTracker(stdx::make_unique<CollectionMetadataTracker>(nullptr)) {} +using CallbackArgs = executor::TaskExecutor::CallbackArgs; + +MetadataManager::MetadataManager(ServiceContext* sc, NamespaceString nss) + : _nss(std::move(nss)), + _serviceContext(sc), + _activeMetadataTracker(stdx::make_unique<CollectionMetadataTracker>(nullptr)) {} MetadataManager::~MetadataManager() { stdx::lock_guard<stdx::mutex> scopedLock(_managerLock); @@ -320,6 +326,11 @@ void MetadataManager::_addRangeToClean_inlock(const ChunkRange& range) { invariant(!rangeMapOverlaps(_rangesToClean, range.getMin(), range.getMax())); invariant(!rangeMapOverlaps(_receivingChunks, range.getMin(), range.getMax())); _rangesToClean.insert(std::make_pair(range.getMin().getOwned(), range.getMax().getOwned())); + + // If _rangesToClean was previously empty, we need to start the collection range deleter + if (_rangesToClean.size() == 1UL) { + ShardingState::get(_serviceContext)->scheduleCleanup(_nss); + } } void MetadataManager::removeRangeToClean(const ChunkRange& range) { diff --git a/src/mongo/db/s/metadata_manager.h b/src/mongo/db/s/metadata_manager.h index 319db8180fb..0100ef59d28 100644 --- a/src/mongo/db/s/metadata_manager.h +++ b/src/mongo/db/s/metadata_manager.h @@ -32,8 +32,11 @@ #include <memory> #include "mongo/base/disallow_copying.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/service_context.h" #include "mongo/s/catalog/type_chunk.h" + #include "mongo/stdx/memory.h" namespace mongo { @@ -44,7 +47,7 @@ class MetadataManager { MONGO_DISALLOW_COPYING(MetadataManager); public: - MetadataManager(); + MetadataManager(ServiceContext* sc, NamespaceString nss); ~MetadataManager(); /** @@ -127,6 +130,11 @@ private: void _setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata); + const NamespaceString _nss; + + // ServiceContext from which to obtain instances of global support objects. + ServiceContext* _serviceContext; + // Mutex to protect the state below stdx::mutex _managerLock; diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index 412b5e7ba89..1e50b85bea3 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -31,8 +31,13 @@ #include "mongo/db/s/metadata_manager.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/client.h" #include "mongo/db/jsobj.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/service_context.h" +#include "mongo/db/service_context_d_test_fixture.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" @@ -44,13 +49,23 @@ using unittest::assertGet; namespace { -std::unique_ptr<CollectionMetadata> makeEmptyMetadata() { - return stdx::make_unique<CollectionMetadata>(BSON("key" << 1), ChunkVersion(1, 0, OID::gen())); -} +class MetadataManagerTest : public ServiceContextMongoDTest { +protected: + void setUp() override { + ServiceContextMongoDTest::setUp(); + ShardingState::get(getServiceContext()) + ->setScheduleCleanupFunctionForTest([](const NamespaceString& nss) {}); + } + + std::unique_ptr<CollectionMetadata> makeEmptyMetadata() { + return stdx::make_unique<CollectionMetadata>(BSON("key" << 1), + ChunkVersion(1, 0, OID::gen())); + } +}; -TEST(MetadataManager, SetAndGetActiveMetadata) { - MetadataManager manager; +TEST_F(MetadataManagerTest, SetAndGetActiveMetadata) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); std::unique_ptr<CollectionMetadata> cm = makeEmptyMetadata(); auto cmPtr = cm.get(); @@ -60,8 +75,9 @@ TEST(MetadataManager, SetAndGetActiveMetadata) { ASSERT_EQ(cmPtr, scopedMetadata.getMetadata()); }; -TEST(MetadataManager, RefreshActiveMetadata) { - MetadataManager manager; + +TEST_F(MetadataManagerTest, ResetActiveMetadata) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); manager.refreshActiveMetadata(makeEmptyMetadata()); ScopedCollectionMetadata scopedMetadata1 = manager.getActiveMetadata(); @@ -78,38 +94,38 @@ TEST(MetadataManager, RefreshActiveMetadata) { ASSERT_EQ(cm2Ptr, scopedMetadata2.getMetadata()); }; -TEST(MetadataManager, AddAndRemoveRanges) { - MetadataManager mm; +TEST_F(MetadataManagerTest, AddAndRemoveRanges) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); ChunkRange cr2 = ChunkRange(BSON("key" << 10), BSON("key" << 20)); - mm.addRangeToClean(cr1); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 1UL); - mm.removeRangeToClean(cr1); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 0UL); + manager.addRangeToClean(cr1); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); + manager.removeRangeToClean(cr1); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); - mm.addRangeToClean(cr1); - mm.addRangeToClean(cr2); - mm.removeRangeToClean(cr1); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 1UL); - auto ranges = mm.getCopyOfRangesToClean(); + manager.addRangeToClean(cr1); + manager.addRangeToClean(cr2); + manager.removeRangeToClean(cr1); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); + auto ranges = manager.getCopyOfRangesToClean(); auto it = ranges.find(cr2.getMin()); ChunkRange remainingChunk = ChunkRange(it->first, it->second); ASSERT_EQ(remainingChunk.toString(), cr2.toString()); - mm.removeRangeToClean(cr2); + manager.removeRangeToClean(cr2); } // Tests that a removal in the middle of an existing ChunkRange results in // two correct chunk ranges. -TEST(MetadataManager, RemoveRangeInMiddleOfRange) { - MetadataManager mm; +TEST_F(MetadataManagerTest, RemoveRangeInMiddleOfRange) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - mm.addRangeToClean(cr1); - mm.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6))); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 2UL); + manager.addRangeToClean(cr1); + manager.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6))); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL); - auto ranges = mm.getCopyOfRangesToClean(); + auto ranges = manager.getCopyOfRangesToClean(); auto it = ranges.find(BSON("key" << 0)); ChunkRange expectedChunk = ChunkRange(BSON("key" << 0), BSON("key" << 4)); ChunkRange remainingChunk = ChunkRange(it->first, it->second); @@ -120,59 +136,59 @@ TEST(MetadataManager, RemoveRangeInMiddleOfRange) { remainingChunk = ChunkRange(it->first, it->second); ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - mm.removeRangeToClean(cr1); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 0UL); + manager.removeRangeToClean(cr1); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); } // Tests removals that overlap with just one ChunkRange. -TEST(MetadataManager, RemoveRangeWithSingleRangeOverlap) { - MetadataManager mm; +TEST_F(MetadataManagerTest, RemoveRangeWithSingleRangeOverlap) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); - mm.addRangeToClean(cr1); - mm.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 5))); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 1UL); - auto ranges = mm.getCopyOfRangesToClean(); + manager.addRangeToClean(cr1); + manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 5))); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); + auto ranges = manager.getCopyOfRangesToClean(); auto it = ranges.find(BSON("key" << 5)); ChunkRange remainingChunk = ChunkRange(it->first, it->second); ChunkRange expectedChunk = ChunkRange(BSON("key" << 5), BSON("key" << 10)); ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - mm.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6))); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 1UL); - ranges = mm.getCopyOfRangesToClean(); + manager.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6))); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); + ranges = manager.getCopyOfRangesToClean(); it = ranges.find(BSON("key" << 6)); remainingChunk = ChunkRange(it->first, it->second); expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 10)); ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - mm.removeRangeToClean(ChunkRange(BSON("key" << 9), BSON("key" << 13))); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 1UL); - ranges = mm.getCopyOfRangesToClean(); + manager.removeRangeToClean(ChunkRange(BSON("key" << 9), BSON("key" << 13))); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL); + ranges = manager.getCopyOfRangesToClean(); it = ranges.find(BSON("key" << 6)); remainingChunk = ChunkRange(it->first, it->second); expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 9)); ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - mm.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 10))); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 0UL); + manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 10))); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); } // Tests removals that overlap with more than one ChunkRange. -TEST(MetadataManager, RemoveRangeWithMultipleRangeOverlaps) { - MetadataManager mm; +TEST_F(MetadataManagerTest, RemoveRangeWithMultipleRangeOverlaps) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10)); ChunkRange cr2 = ChunkRange(BSON("key" << 10), BSON("key" << 20)); ChunkRange cr3 = ChunkRange(BSON("key" << 20), BSON("key" << 30)); - mm.addRangeToClean(cr1); - mm.addRangeToClean(cr2); - mm.addRangeToClean(cr3); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 3UL); + manager.addRangeToClean(cr1); + manager.addRangeToClean(cr2); + manager.addRangeToClean(cr3); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 3UL); - mm.removeRangeToClean(ChunkRange(BSON("key" << 8), BSON("key" << 22))); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 2UL); - auto ranges = mm.getCopyOfRangesToClean(); + manager.removeRangeToClean(ChunkRange(BSON("key" << 8), BSON("key" << 22))); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL); + auto ranges = manager.getCopyOfRangesToClean(); auto it = ranges.find(BSON("key" << 0)); ChunkRange remainingChunk = ChunkRange(it->first, it->second); ChunkRange expectedChunk = ChunkRange(BSON("key" << 0), BSON("key" << 8)); @@ -182,117 +198,117 @@ TEST(MetadataManager, RemoveRangeWithMultipleRangeOverlaps) { expectedChunk = ChunkRange(BSON("key" << 22), BSON("key" << 30)); ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString()); - mm.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 30))); - ASSERT_EQ(mm.getCopyOfRangesToClean().size(), 0UL); + manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 30))); + ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL); } -TEST(MetadataManager, RefreshAfterSuccessfulMigrationSinglePending) { - MetadataManager mm; - mm.refreshActiveMetadata(makeEmptyMetadata()); +TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + manager.refreshActiveMetadata(makeEmptyMetadata()); const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - mm.beginReceive(cr1); - ASSERT_EQ(mm.getCopyOfReceivingChunks().size(), 1UL); - ASSERT_EQ(mm.getActiveMetadata()->getChunks().size(), 0UL); + manager.beginReceive(cr1); + ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 1UL); + ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); - ChunkVersion version = mm.getActiveMetadata()->getCollVersion(); + ChunkVersion version = manager.getActiveMetadata()->getCollVersion(); version.incMajor(); - mm.refreshActiveMetadata( - mm.getActiveMetadata()->clonePlusChunk(cr1.getMin(), cr1.getMax(), version)); - ASSERT_EQ(mm.getCopyOfReceivingChunks().size(), 0UL); - ASSERT_EQ(mm.getActiveMetadata()->getChunks().size(), 1UL); + manager.refreshActiveMetadata( + manager.getActiveMetadata()->clonePlusChunk(cr1.getMin(), cr1.getMax(), version)); + ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 0UL); + ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); } -TEST(MetadataManager, RefreshAfterSuccessfulMigrationMultiplePending) { - MetadataManager mm; - mm.refreshActiveMetadata(makeEmptyMetadata()); +TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + manager.refreshActiveMetadata(makeEmptyMetadata()); const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - mm.beginReceive(cr1); + manager.beginReceive(cr1); const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40)); - mm.beginReceive(cr2); + manager.beginReceive(cr2); - ASSERT_EQ(mm.getCopyOfReceivingChunks().size(), 2UL); - ASSERT_EQ(mm.getActiveMetadata()->getChunks().size(), 0UL); + ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL); + ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); { - ChunkVersion version = mm.getActiveMetadata()->getCollVersion(); + ChunkVersion version = manager.getActiveMetadata()->getCollVersion(); version.incMajor(); - mm.refreshActiveMetadata( - mm.getActiveMetadata()->clonePlusChunk(cr1.getMin(), cr1.getMax(), version)); - ASSERT_EQ(mm.getCopyOfReceivingChunks().size(), 1UL); - ASSERT_EQ(mm.getActiveMetadata()->getChunks().size(), 1UL); + manager.refreshActiveMetadata( + manager.getActiveMetadata()->clonePlusChunk(cr1.getMin(), cr1.getMax(), version)); + ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 1UL); + ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); } { - ChunkVersion version = mm.getActiveMetadata()->getCollVersion(); + ChunkVersion version = manager.getActiveMetadata()->getCollVersion(); version.incMajor(); - mm.refreshActiveMetadata( - mm.getActiveMetadata()->clonePlusChunk(cr2.getMin(), cr2.getMax(), version)); - ASSERT_EQ(mm.getCopyOfReceivingChunks().size(), 0UL); - ASSERT_EQ(mm.getActiveMetadata()->getChunks().size(), 2UL); + manager.refreshActiveMetadata( + manager.getActiveMetadata()->clonePlusChunk(cr2.getMin(), cr2.getMax(), version)); + ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 0UL); + ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 2UL); } } -TEST(MetadataManager, RefreshAfterNotYetCompletedMigrationMultiplePending) { - MetadataManager mm; - mm.refreshActiveMetadata(makeEmptyMetadata()); +TEST_F(MetadataManagerTest, RefreshAfterNotYetCompletedMigrationMultiplePending) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + manager.refreshActiveMetadata(makeEmptyMetadata()); const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - mm.beginReceive(cr1); + manager.beginReceive(cr1); const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40)); - mm.beginReceive(cr2); + manager.beginReceive(cr2); - ASSERT_EQ(mm.getCopyOfReceivingChunks().size(), 2UL); - ASSERT_EQ(mm.getActiveMetadata()->getChunks().size(), 0UL); + ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL); + ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); - ChunkVersion version = mm.getActiveMetadata()->getCollVersion(); + ChunkVersion version = manager.getActiveMetadata()->getCollVersion(); version.incMajor(); - mm.refreshActiveMetadata( - mm.getActiveMetadata()->clonePlusChunk(BSON("key" << 50), BSON("key" << 60), version)); - ASSERT_EQ(mm.getCopyOfReceivingChunks().size(), 2UL); - ASSERT_EQ(mm.getActiveMetadata()->getChunks().size(), 1UL); + manager.refreshActiveMetadata( + manager.getActiveMetadata()->clonePlusChunk(BSON("key" << 50), BSON("key" << 60), version)); + ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL); + ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); } -TEST(MetadataManager, BeginReceiveWithOverlappingRange) { - MetadataManager mm; - mm.refreshActiveMetadata(makeEmptyMetadata()); +TEST_F(MetadataManagerTest, BeginReceiveWithOverlappingRange) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + manager.refreshActiveMetadata(makeEmptyMetadata()); const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); - mm.beginReceive(cr1); + manager.beginReceive(cr1); const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40)); - mm.beginReceive(cr2); + manager.beginReceive(cr2); const ChunkRange crOverlap(BSON("key" << 5), BSON("key" << 35)); - mm.beginReceive(crOverlap); + manager.beginReceive(crOverlap); - const auto copyOfPending = mm.getCopyOfReceivingChunks(); + const auto copyOfPending = manager.getCopyOfReceivingChunks(); ASSERT_EQ(copyOfPending.size(), 1UL); - ASSERT_EQ(mm.getActiveMetadata()->getChunks().size(), 0UL); + ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL); const auto it = copyOfPending.find(BSON("key" << 5)); ASSERT(it != copyOfPending.end()); ASSERT_EQ(it->second, BSON("key" << 35)); } -TEST(MetadataManager, RefreshMetadataAfterDropAndRecreate) { - MetadataManager mm; - mm.refreshActiveMetadata(makeEmptyMetadata()); +TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) { + MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB")); + manager.refreshActiveMetadata(makeEmptyMetadata()); { - auto metadata = mm.getActiveMetadata(); + auto metadata = manager.getActiveMetadata(); ChunkVersion newVersion = metadata->getCollVersion(); newVersion.incMajor(); - mm.refreshActiveMetadata( + manager.refreshActiveMetadata( metadata->clonePlusChunk(BSON("key" << 0), BSON("key" << 10), newVersion)); } @@ -301,11 +317,11 @@ TEST(MetadataManager, RefreshMetadataAfterDropAndRecreate) { ChunkVersion newVersion = recreateMetadata->getCollVersion(); newVersion.incMajor(); - mm.refreshActiveMetadata( + manager.refreshActiveMetadata( recreateMetadata->clonePlusChunk(BSON("key" << 20), BSON("key" << 30), newVersion)); - ASSERT_EQ(mm.getActiveMetadata()->getChunks().size(), 1UL); + ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL); - const auto chunkEntry = mm.getActiveMetadata()->getChunks().begin(); + const auto chunkEntry = manager.getActiveMetadata()->getChunks().begin(); ASSERT_EQ(BSON("key" << 20), chunkEntry->first); ASSERT_EQ(BSON("key" << 30), chunkEntry->second); } diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 999f74a6be0..98fc7037c40 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -78,6 +78,8 @@ using std::shared_ptr; using std::string; using std::vector; +using CallbackArgs = executor::TaskExecutor::CallbackArgs; + namespace { const auto getShardingState = ServiceContext::declareDecoration<ShardingState>(); @@ -120,7 +122,13 @@ ShardingState::ShardingState() : _initializationState(static_cast<uint32_t>(InitializationState::kNew)), _initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")), _configServerTickets(kMaxConfigServerRefreshThreads), - _globalInit(&initializeGlobalShardingStateForMongod) {} + _globalInit(&initializeGlobalShardingStateForMongod), + _scheduleWorkFn([this](NamespaceString nss) { + getRangeDeleterTaskExecutor()->scheduleWork([=](const CallbackArgs& cbArgs) { + CollectionRangeDeleter* rd = new CollectionRangeDeleter(nss); + rd->run(); + }); + }) {} ShardingState::~ShardingState() = default; @@ -220,12 +228,14 @@ void ShardingState::setShardName(const string& name) { } } -CollectionShardingState* ShardingState::getNS(const std::string& ns) { +CollectionShardingState* ShardingState::getNS(const std::string& ns, OperationContext* txn) { stdx::lock_guard<stdx::mutex> lk(_mutex); CollectionShardingStateMap::iterator it = _collections.find(ns); if (it == _collections.end()) { - auto inserted = _collections.insert( - make_pair(ns, stdx::make_unique<CollectionShardingState>(NamespaceString(ns)))); + auto inserted = + _collections.insert(make_pair(ns, + stdx::make_unique<CollectionShardingState>( + txn->getServiceContext(), NamespaceString(ns)))); invariant(inserted.second); it = std::move(inserted.first); } @@ -250,6 +260,14 @@ void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) { _globalInit = func; } +void ShardingState::setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn) { + _scheduleWorkFn = fn; +} + +void ShardingState::scheduleCleanup(const NamespaceString& nss) { + _scheduleWorkFn(nss); +} + Status ShardingState::onStaleShardVersion(OperationContext* txn, const NamespaceString& nss, const ChunkVersion& expectedVersion) { diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index e9605c6b717..a27b2841eda 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -36,7 +36,9 @@ #include "mongo/bson/oid.h" #include "mongo/db/namespace_string.h" #include "mongo/db/s/active_migrations_registry.h" +#include "mongo/db/s/collection_range_deleter.h" #include "mongo/db/s/migration_destination_manager.h" +#include "mongo/executor/task_executor.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" @@ -73,6 +75,10 @@ public: using GlobalInitFunc = stdx::function<Status(OperationContext*, const ConnectionString&, StringData)>; + // Signature for the callback function used by the MetadataManager to inform the + // sharding subsystem that there is range cleanup work to be done. + using RangeDeleterCleanupNotificationFunc = stdx::function<void(const NamespaceString&)>; + ShardingState(); ~ShardingState(); @@ -155,7 +161,7 @@ public: */ void setShardName(const std::string& shardName); - CollectionShardingState* getNS(const std::string& ns); + CollectionShardingState* getNS(const std::string& ns, OperationContext* txn); /** * Clears the collection metadata cache after step down. @@ -243,10 +249,23 @@ public: void setGlobalInitMethodForTest(GlobalInitFunc func); /** + * Schedules for the range to clean of the given namespace to be deleted. + * Behavior can be modified through setScheduleCleanupFunctionForTest. + */ + void scheduleCleanup(const NamespaceString& nss); + + /** * Returns a pointer to the collection range deleter task executor. */ executor::ThreadPoolTaskExecutor* getRangeDeleterTaskExecutor(); + /** + * Sets the function used by scheduleWorkOnRangeDeleterTaskExecutor to + * schedule work. Used for mocking the executor for testing. See the ShardingState + * for the default implementation of _scheduleWorkFn. + */ + void setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn); + private: friend class ScopedRegisterMigration; @@ -363,6 +382,10 @@ private: // Function for initializing the external sharding state components not owned here. GlobalInitFunc _globalInit; + // Function for scheduling work on the _rangeDeleterTaskExecutor. + // Used in call to scheduleCleanup(NamespaceString). + RangeDeleterCleanupNotificationFunc _scheduleWorkFn; + // Task executor for the collection range deleter. std::unique_ptr<executor::ThreadPoolTaskExecutor> _rangeDeleterTaskExecutor; }; diff --git a/src/mongo/db/service_context_d_test_fixture.cpp b/src/mongo/db/service_context_d_test_fixture.cpp index a2e6edcbb29..5108d9ae2db 100644 --- a/src/mongo/db/service_context_d_test_fixture.cpp +++ b/src/mongo/db/service_context_d_test_fixture.cpp @@ -42,10 +42,12 @@ #include "mongo/db/service_context_d.h" #include "mongo/db/storage/storage_options.h" #include "mongo/unittest/temp_dir.h" +#include "mongo/util/scopeguard.h" namespace mongo { void ServiceContextMongoDTest::setUp() { + Client::initThread(getThreadName().c_str()); ServiceContext* serviceContext = getGlobalServiceContext(); if (!serviceContext->getGlobalStorageEngine()) { // When using the "ephemeralForTest" storage engine, it is fine for the temporary directory @@ -60,10 +62,15 @@ void ServiceContextMongoDTest::setUp() { } void ServiceContextMongoDTest::tearDown() { + ON_BLOCK_EXIT([&] { Client::destroy(); }); auto txn = cc().makeOperationContext(); _dropAllDBs(txn.get()); } +ServiceContext* ServiceContextMongoDTest::getServiceContext() { + return getGlobalServiceContext(); +} + void ServiceContextMongoDTest::_dropAllDBs(OperationContext* txn) { dropAllDatabasesExceptLocal(txn); diff --git a/src/mongo/db/service_context_d_test_fixture.h b/src/mongo/db/service_context_d_test_fixture.h index 2cfb6f1cb89..d6eff14c72d 100644 --- a/src/mongo/db/service_context_d_test_fixture.h +++ b/src/mongo/db/service_context_d_test_fixture.h @@ -32,6 +32,7 @@ namespace mongo { +class ServiceContext; class OperationContext; /** @@ -50,6 +51,12 @@ protected: void tearDown() override; /** + * Returns a service context, which is only valid for this instance of the test. + * Must not be called before setUp or after tearDown. + */ + ServiceContext* getServiceContext(); + + /** * Drops all databases. Call this before global ReplicationCoordinator is destroyed -- it is * used to drop the databases. */ |