diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2018-06-21 18:12:57 -0400 |
---|---|---|
committer | Matthew Saltz <matthew.saltz@mongodb.com> | 2018-06-25 15:53:28 -0400 |
commit | 5339c9a55181662545652ab7106c8f4e55109327 (patch) | |
tree | 2e8bc2898fc1ef0095b3dc0b82033fae3dfd3e6a | |
parent | b7ff5816f4d9d468b1875013384e7e51184628a0 (diff) | |
download | mongo-5339c9a55181662545652ab7106c8f4e55109327.tar.gz |
SERVER-35734 Integrate new ChunkWritesTracker and ChunkSplitStateDriver with ChunkSplitter and ShardServerOpObserver
-rw-r--r-- | src/mongo/db/s/SConscript | 22 | ||||
-rw-r--r-- | src/mongo/db/s/chunk_split_state_driver.cpp (renamed from src/mongo/s/chunk_split_state_driver.cpp) | 2 | ||||
-rw-r--r-- | src/mongo/db/s/chunk_split_state_driver.h (renamed from src/mongo/s/chunk_split_state_driver.h) | 0 | ||||
-rw-r--r-- | src/mongo/db/s/chunk_split_state_driver_test.cpp (renamed from src/mongo/s/chunk_split_state_driver_test.cpp) | 2 | ||||
-rw-r--r-- | src/mongo/db/s/chunk_splitter.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/s/chunk_splitter.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/shard_server_op_observer.cpp | 44 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 20 | ||||
-rw-r--r-- | src/mongo/s/chunk.cpp | 17 | ||||
-rw-r--r-- | src/mongo/s/chunk.h | 40 | ||||
-rw-r--r-- | src/mongo/s/write_ops/cluster_write.cpp | 12 |
11 files changed, 84 insertions, 98 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 0af63a22dc6..2dff123b428 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -61,12 +61,34 @@ env.Library( '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/rw_concern_d', + 'chunk_splitter', 'sharding', 'sharding_api_d', 'sharding_catalog_manager', ], ) +env.Library( + target='chunk_splitter', + source=[ + 'chunk_split_state_driver.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/s/chunk_writes_tracker' + ], +) + +env.CppUnitTest( + target='chunk_split_state_driver_test', + source=[ + 'chunk_split_state_driver_test.cpp', + ], + LIBDEPS=[ + 'chunk_splitter', + ] +) + env.CppUnitTest( target='config_server_op_observer_test', source=[ diff --git a/src/mongo/s/chunk_split_state_driver.cpp b/src/mongo/db/s/chunk_split_state_driver.cpp index 707033d6961..ed6b704f4c5 100644 --- a/src/mongo/s/chunk_split_state_driver.cpp +++ b/src/mongo/db/s/chunk_split_state_driver.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/s/chunk_split_state_driver.h" +#include "mongo/db/s/chunk_split_state_driver.h" #include "mongo/util/assert_util.h" diff --git a/src/mongo/s/chunk_split_state_driver.h b/src/mongo/db/s/chunk_split_state_driver.h index 427f3cda056..427f3cda056 100644 --- a/src/mongo/s/chunk_split_state_driver.h +++ b/src/mongo/db/s/chunk_split_state_driver.h diff --git a/src/mongo/s/chunk_split_state_driver_test.cpp b/src/mongo/db/s/chunk_split_state_driver_test.cpp index 89042b3dbec..887126d54ee 100644 --- a/src/mongo/s/chunk_split_state_driver_test.cpp +++ b/src/mongo/db/s/chunk_split_state_driver_test.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/s/chunk_split_state_driver.h" +#include "mongo/db/s/chunk_split_state_driver.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/s/chunk_splitter.cpp b/src/mongo/db/s/chunk_splitter.cpp index 94ff2e4bc81..efec5986aa5 100644 --- a/src/mongo/db/s/chunk_splitter.cpp +++ b/src/mongo/db/s/chunk_splitter.cpp @@ -37,6 +37,7 @@ #include "mongo/db/client.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/s/chunk_split_state_driver.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/split_chunk.h" #include "mongo/db/s/split_vector.h" @@ -259,19 +260,22 @@ void ChunkSplitter::onStepDown() { // TODO: Re-enable this log when auto split is actively running on shards. } -void ChunkSplitter::trySplitting(const NamespaceString& nss, +void ChunkSplitter::trySplitting(ChunkSplitStateDriver chunkSplitStateDriver, + const NamespaceString& nss, const BSONObj& min, const BSONObj& max, long dataWritten) { if (!_isPrimary) { return; } - uassertStatusOK(_threadPool.schedule([ this, nss, min, max, dataWritten ]() noexcept { - _runAutosplit(nss, min, max, dataWritten); - })); + uassertStatusOK(_threadPool.schedule( + [ this, &chunkSplitStateDriver, nss, min, max, dataWritten ]() noexcept { + _runAutosplit(std::move(chunkSplitStateDriver), nss, min, max, dataWritten); + })); } -void ChunkSplitter::_runAutosplit(const NamespaceString& nss, +void ChunkSplitter::_runAutosplit(ChunkSplitStateDriver chunkSplitStateDriver, + const NamespaceString& nss, const BSONObj& min, const BSONObj& max, long dataWritten) { @@ -306,6 +310,7 @@ void ChunkSplitter::_runAutosplit(const NamespaceString& nss, << " dataWritten since last check: " << dataWritten << " maxChunkSizeBytes: " << maxChunkSizeBytes; + chunkSplitStateDriver.prepareSplit(); auto splitPoints = uassertStatusOK(splitVector(opCtx.get(), nss, shardKeyPattern.toBSON(), @@ -359,6 +364,7 @@ void ChunkSplitter::_runAutosplit(const NamespaceString& nss, cm->getVersion(), ChunkRange(min, max), splitPoints)); + chunkSplitStateDriver.commitSplit(); const bool shouldBalance = isAutoBalanceEnabled(opCtx.get(), nss, balancerConfig); diff --git a/src/mongo/db/s/chunk_splitter.h b/src/mongo/db/s/chunk_splitter.h index 91de6c7fe10..71800dc3585 100644 --- a/src/mongo/db/s/chunk_splitter.h +++ b/src/mongo/db/s/chunk_splitter.h @@ -35,6 +35,7 @@ namespace mongo { class NamespaceString; class OperationContext; class ServiceContext; +class ChunkSplitStateDriver; /** * Handles asynchronous auto-splitting of chunks. @@ -75,7 +76,8 @@ public: /** * Schedules an autosplit task. This function throws on scheduling failure. */ - void trySplitting(const NamespaceString& nss, + void trySplitting(ChunkSplitStateDriver chunkSplitStateDriver, + const NamespaceString& nss, const BSONObj& min, const BSONObj& max, long dataWritten); @@ -89,7 +91,8 @@ private: * original owner. This optimization presumes that the user is doing writes with increasing or * decreasing shard key values. */ - void _runAutosplit(const NamespaceString& nss, + void _runAutosplit(ChunkSplitStateDriver chunkSplitStateDriver, + const NamespaceString& nss, const BSONObj& min, const BSONObj& max, long dataWritten); diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 20508022dbf..75c533f34cf 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -34,6 +34,8 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/s/chunk_split_state_driver.h" +#include "mongo/db/s/chunk_splitter.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/migration_source_manager.h" @@ -137,23 +139,12 @@ void onConfigDeleteInvalidateCachedCollectionMetadataAndNotify(OperationContext* } /** - * Returns true if the total number of bytes on the specified chunk nears the max size of a shard. - */ -bool shouldSplitChunk(OperationContext* opCtx, - const ShardKeyPattern& shardKeyPattern, - const Chunk& chunk) { - const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); - invariant(balancerConfig); - - return chunk.shouldSplit(balancerConfig->getMaxChunkSizeBytes()); -} - -/** * If the collection is sharded, finds the chunk that contains the specified document and increments * the size tracked for that chunk by the specified amount of data written, in bytes. Returns the * number of total bytes on that chunk after the data is written. */ void incrementChunkOnInsertOrUpdate(OperationContext* opCtx, + const NamespaceString& nss, const ChunkManager& chunkManager, const BSONObj& document, long dataWritten) { @@ -175,13 +166,21 @@ void incrementChunkOnInsertOrUpdate(OperationContext* opCtx, // Note that we can assume the simple collation, because shard keys do not support non-simple // collations. auto chunk = chunkManager.findIntersectingChunkWithSimpleCollation(shardKey); - chunk.addBytesWritten(dataWritten); + auto chunkWritesTracker = chunk.getWritesTracker(); + chunkWritesTracker->addBytesWritten(dataWritten); + + const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); - // If the chunk becomes too large, then we call the ChunkSplitter to schedule a split. Then, we - // reset the tracking for that chunk to 0. - if (shouldSplitChunk(opCtx, shardKeyPattern, chunk)) { - // TODO: call ChunkSplitter here - chunk.clearBytesWritten(); + if (chunkWritesTracker->shouldSplit(balancerConfig->getMaxChunkSizeBytes())) { + auto chunkSplitStateDriver = ChunkSplitStateDriver::tryInitiateSplit(chunkWritesTracker); + if (chunkSplitStateDriver) { + // TODO (SERVER-9287): Enable the following to trigger chunk splitting + // ChunkSplitter::get(opCtx).trySplitting(std::move(chunkSplitStateDriver.get()), + // nss, + // chunk.getMin(), + // chunk.getMax(), + // dataWritten); + } } } @@ -217,7 +216,7 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, if (metadata) { incrementChunkOnInsertOrUpdate( - opCtx, *metadata->getChunkManager(), insertedDoc, insertedDoc.objsize()); + opCtx, nss, *metadata->getChunkManager(), insertedDoc, insertedDoc.objsize()); } } } @@ -313,8 +312,11 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE } if (metadata) { - incrementChunkOnInsertOrUpdate( - opCtx, *metadata->getChunkManager(), args.updatedDoc, args.updatedDoc.objsize()); + incrementChunkOnInsertOrUpdate(opCtx, + args.nss, + *metadata->getChunkManager(), + args.updatedDoc, + args.updatedDoc.objsize()); } } diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 553f2211387..9d317909830 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -384,26 +384,6 @@ env.CppUnitTest( ) env.Library( - target='chunk_split_state_driver', - source=[ - 'chunk_split_state_driver.cpp', - ], - LIBDEPS=[ - 'chunk_writes_tracker' - ], -) - -env.CppUnitTest( - target='chunk_split_state_driver_test', - source=[ - 'chunk_split_state_driver_test.cpp', - ], - LIBDEPS=[ - 'chunk_split_state_driver', - ] -) - -env.Library( target='catalog_cache_test_fixture', source=[ 'catalog_cache_test_fixture.cpp', diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 2bc6c8748ac..d586ab8c879 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -98,23 +98,6 @@ bool ChunkInfo::containsKey(const BSONObj& shardKey) const { return getMin().woCompare(shardKey) <= 0 && shardKey.woCompare(getMax()) < 0; } -uint64_t ChunkInfo::getBytesWritten() const { - return _writesTracker->getBytesWritten(); -} - -void ChunkInfo::addBytesWritten(uint64_t bytesWrittenIncrement) { - _writesTracker->addBytesWritten(bytesWrittenIncrement); -} - -void ChunkInfo::clearBytesWritten() { - _writesTracker->clearBytesWritten(); -} - -bool ChunkInfo::shouldSplit(uint64_t maxChunkSize) const { - // Check if there are enough estimated bytes written to warrant a split - return _writesTracker->shouldSplit(maxChunkSize); -} - std::string ChunkInfo::toString() const { return str::stream() << ChunkType::shard() << ": " << _shardId << ", " << ChunkType::lastmod() << ": " << _lastmod.toString() << ", " << _range.toString(); diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index 4e26d006402..7a5a23adedf 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -67,6 +67,13 @@ public: } /** + * Get writes tracker for this chunk. + */ + std::shared_ptr<ChunkWritesTracker> getWritesTracker() const { + return _writesTracker; + } + + /** * Returns a string represenation of the chunk for logging. */ std::string toString() const; @@ -78,15 +85,6 @@ public: bool containsKey(const BSONObj& shardKey) const; /** - * Get/increment/set the estimation of how much data was written for this chunk. - */ - uint64_t getBytesWritten() const; - void addBytesWritten(uint64_t bytesWrittenIncrement); - void clearBytesWritten(); - - bool shouldSplit(uint64_t desiredChunkSize) const; - - /** * Marks this chunk as jumbo. Only moves from false to true once and is used by the balancer. */ void markAsJumbo(); @@ -140,6 +138,13 @@ public: } /** + * Get writes tracker for this chunk. + */ + std::shared_ptr<ChunkWritesTracker> getWritesTracker() const { + return _chunkInfo.getWritesTracker(); + } + + /** * Returns a string represenation of the chunk for logging. */ std::string toString() const { @@ -155,23 +160,6 @@ public: } /** - * Get/increment/set the estimation of how much data was written for this chunk. - */ - uint64_t getBytesWritten() const { - return _chunkInfo.getBytesWritten(); - } - void addBytesWritten(uint64_t bytesWrittenIncrement) { - _chunkInfo.addBytesWritten(bytesWrittenIncrement); - } - void clearBytesWritten() { - _chunkInfo.clearBytesWritten(); - } - - bool shouldSplit(uint64_t desiredChunkSize) const { - return _chunkInfo.shouldSplit(desiredChunkSize); - } - - /** * Marks this chunk as jumbo. Only moves from false to true once and is used by the balancer. */ void markAsJumbo() { diff --git a/src/mongo/s/write_ops/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp index 19311ea39ab..b82c6c3f88a 100644 --- a/src/mongo/s/write_ops/cluster_write.cpp +++ b/src/mongo/s/write_ops/cluster_write.cpp @@ -41,6 +41,7 @@ #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_writes_tracker.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" @@ -249,11 +250,12 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, const bool maxIsInf = (0 == manager->getShardKeyPattern().getKeyPattern().globalMax().woCompare(chunk.getMax())); - chunk.addBytesWritten(chunkBytesWritten); + auto writesTracker = chunk.getWritesTracker(); + writesTracker->addBytesWritten(chunkBytesWritten); const uint64_t desiredChunkSize = balancerConfig->getMaxChunkSizeBytes(); - if (!chunk.shouldSplit(desiredChunkSize)) { + if (!writesTracker->shouldSplit(desiredChunkSize)) { return; } @@ -307,7 +309,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, // No split points means there isn't enough data to split on; 1 split point means we // have // between half the chunk size to full chunk size so there is no need to split yet - chunk.clearBytesWritten(); + writesTracker->clearBytesWritten(); return; } @@ -315,7 +317,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, // We don't want to reset _dataWritten since we want to check the other side right away } else { // We're splitting, so should wait a bit - chunk.clearBytesWritten(); + writesTracker->clearBytesWritten(); } // We assume that if the chunk being split is the first (or last) one on the collection, @@ -401,7 +403,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx, // Ensure the collection gets reloaded because of the move Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); } catch (const DBException& ex) { - chunk.clearBytesWritten(); + chunk.getWritesTracker()->clearBytesWritten(); if (ErrorCodes::isStaleShardVersionError(ex.code())) { log() << "Unable to auto-split chunk " << redact(chunkRange.toString()) << causedBy(ex) |