diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2021-05-20 14:22:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-21 12:32:14 +0000 |
commit | c84c7f406a7fc0ea5a63a5cc807687e730580a70 (patch) | |
tree | 2896c70f65adb6f652c35b1b8d4e03ca2e83e808 | |
parent | 309f1ad1ce1bb745c956361e4cef29fc67289f1d (diff) | |
download | mongo-c84c7f406a7fc0ea5a63a5cc807687e730580a70.tar.gz |
SERVER-56307 Allow the donor to enter the critical section when the untransferred mods are within a convergence threshold.
(cherry picked from commit f32a3d08fb7128de3c1b43b8478a5e4626101b89)
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 53 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_runtime_d_params.idl | 13 | ||||
-rw-r--r-- | src/mongo/db/s/start_chunk_clone_request.h | 3 |
5 files changed, 89 insertions, 7 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 28377ec41a0..863ea9150c4 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -39,11 +39,13 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/index/index_access_method.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_source_manager.h" +#include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_statistics.h" #include "mongo/db/s/start_chunk_clone_request.h" #include "mongo/db/service_context.h" @@ -877,18 +879,18 @@ MigrationChunkClonerSourceLegacy::_getIndexScanExecutor( InternalPlanner::IndexScanOptions scanOption) { // Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any // multi-key index prefixed by shard key cannot be multikey over the shard key fields. - const IndexDescriptor* idx = + const IndexDescriptor* shardKeyIdx = collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, _shardKeyPattern.toBSON(), false); // requireSingleKey - if (!idx) { + if (!shardKeyIdx) { return {ErrorCodes::IndexNotFound, str::stream() << "can't find index with prefix " << _shardKeyPattern.toBSON() << " in storeCurrentLocs for " << _args.getNss().ns()}; } // Assume both min and max non-empty, append MinKey's to make them fit chosen index - const KeyPattern kp(idx->keyPattern()); + const KeyPattern kp(shardKeyIdx->keyPattern()); BSONObj min = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMinKey(), false)); BSONObj max = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMaxKey(), false)); @@ -897,7 +899,7 @@ MigrationChunkClonerSourceLegacy::_getIndexScanExecutor( // being queued and will migrate in the 'transferMods' stage. return InternalPlanner::indexScan(opCtx, collection, - idx, + shardKeyIdx, min, max, BoundInclusion::kIncludeStartKeyOnly, @@ -980,6 +982,20 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC const uint64_t collectionAverageObjectSize = collection->averageObjectSize(opCtx); + uint64_t averageObjectIdSize = 0; + const uint64_t defaultObjectIdSize = OID::kOIDSize; + if (totalRecs > 0) { + const auto indexCatalog = collection->getIndexCatalog(); + const auto idIdx = indexCatalog->findIdIndex(opCtx); + if (!idIdx) { + return {ErrorCodes::IndexNotFound, + str::stream() << "can't find index '_id' in storeCurrentLocs for " + << _args.getNss().ns()}; + } + averageObjectIdSize = + indexCatalog->getEntry(idIdx)->accessMethod()->getSpaceUsedBytes(opCtx) / totalRecs; + } + if (isLargeChunk) { return { ErrorCodes::ChunkTooBig, @@ -992,8 +1008,8 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC } stdx::lock_guard<Latch> lk(_mutex); - _averageObjectSizeForCloneLocs = collectionAverageObjectSize + 12; - + _averageObjectSizeForCloneLocs = collectionAverageObjectSize + defaultObjectIdSize; + _averageObjectIdSize = std::max(averageObjectIdSize, defaultObjectIdSize); return Status::OK(); } @@ -1107,6 +1123,31 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC return Status::OK(); } + bool supportsCriticalSectionDuringCatchUp = false; + if (auto featureSupportedField = + res[StartChunkCloneRequest::kSupportsCriticalSectionDuringCatchUp]) { + if (!featureSupportedField.booleanSafe()) { + return {ErrorCodes::Error(563070), + str::stream() + << "Illegal value for " + << StartChunkCloneRequest::kSupportsCriticalSectionDuringCatchUp}; + } + supportsCriticalSectionDuringCatchUp = true; + } + + if (res["state"].String() == "catchup" && supportsCriticalSectionDuringCatchUp) { + int64_t estimatedUntransferredModsSize = _deleted.size() * _averageObjectIdSize + + _reload.size() * _averageObjectSizeForCloneLocs; + auto estimatedUntransferredChunkPercentage = + (std::min(_args.getMaxChunkSizeBytes(), estimatedUntransferredModsSize) * 100) / + _args.getMaxChunkSizeBytes(); + if (estimatedUntransferredChunkPercentage < minCatchUpPercentageBeforeBlockingWrites) { + // The recipient is sufficiently caught-up with the writes on the donor. + // Block writes, so that it can drain everything. + return Status::OK(); + } + } + if (res["state"].String() == "fail") { return {ErrorCodes::OperationFailed, str::stream() << "Data transfer error: " << res["errmsg"].str()}; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 9d438d20eb0..d95d360b0af 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -359,6 +359,9 @@ private: // pre-allocation (initial clone). uint64_t _averageObjectSizeForCloneLocs{0}; + // The estimated average object _id size during the clone phase. + uint64_t _averageObjectIdSize{0}; + // Represents all of the requested but not yet fulfilled operations to be tracked, with regards // to the chunk being cloned. uint64_t _outstandingOperationTrackRequests{0}; diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 85b980e73eb..3066f645590 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -365,6 +365,7 @@ void MigrationDestinationManager::report(BSONObjBuilder& b, b.append("min", _min); b.append("max", _max); b.append("shardKeyPattern", _shardKeyPattern); + b.append(StartChunkCloneRequest::kSupportsCriticalSectionDuringCatchUp, true); b.append("state", stateToString(_state)); @@ -565,6 +566,24 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio stdx::unique_lock<Latch> lock(_mutex); + const auto convergenceTimeout = + Shard::kDefaultConfigCommandTimeout + Shard::kDefaultConfigCommandTimeout / 4; + + // The donor may have started the commit while the recipient is still busy processing + // the last batch of mods sent in the catch up phase. Allow some time for synching up. + auto deadline = Date_t::now() + convergenceTimeout; + + while (_state == CATCHUP) { + if (stdx::cv_status::timeout == + _stateChangedCV.wait_until(lock, deadline.toSystemTimePoint())) { + return {ErrorCodes::CommandFailed, + str::stream() << "startCommit timed out waiting for the catch up completion. " + << "Sender's session is " << sessionId.toString() + << ". Current session is " + << (_sessionId ? _sessionId->toString() : "none.")}; + } + } + if (_state != STEADY) { return {ErrorCodes::CommandFailed, str::stream() << "Migration startCommit attempted when not in STEADY state." @@ -590,7 +609,9 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio _state = COMMIT_START; _stateChangedCV.notify_all(); - auto const deadline = Date_t::now() + Seconds(30); + // Assigning a timeout slightly higher than the one used for network requests to the config + // server. Enough time to retry at least once in case of network failures (SERVER-51397). + deadline = Date_t::now() + convergenceTimeout; while (_sessionId) { if (stdx::cv_status::timeout == _isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) { @@ -1173,6 +1194,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { const auto& mods = res.response; if (mods["size"].number() == 0) { + // There are no more pending modifications to be applied. End the catchup phase break; } diff --git a/src/mongo/db/s/sharding_runtime_d_params.idl b/src/mongo/db/s/sharding_runtime_d_params.idl index dbe92c56f81..76aebc4934f 100644 --- a/src/mongo/db/s/sharding_runtime_d_params.idl +++ b/src/mongo/db/s/sharding_runtime_d_params.idl @@ -84,6 +84,19 @@ server_parameters: cpp_varname: migrationLockAcquisitionMaxWaitMS default: 500 + minCatchUpPercentageBeforeBlockingWrites: + description: >- + The maximum percentage of untrasferred chunk mods at the end of a catch up iteration + that may be deferred to the next phase of the migration protocol + (where new writes get blocked). + set_at: [startup] + cpp_vartype: int + cpp_varname: minCatchUpPercentageBeforeBlockingWrites + validator: + gte: 0 + lte: 100 + default: 10 + orphanCleanupDelaySecs: description: 'How long to wait before starting cleanup of an emigrated chunk range.' set_at: [startup, runtime] diff --git a/src/mongo/db/s/start_chunk_clone_request.h b/src/mongo/db/s/start_chunk_clone_request.h index 17cc08f4460..b9bf2080def 100644 --- a/src/mongo/db/s/start_chunk_clone_request.h +++ b/src/mongo/db/s/start_chunk_clone_request.h @@ -49,6 +49,9 @@ class StatusWith; */ class StartChunkCloneRequest { public: + static constexpr auto kSupportsCriticalSectionDuringCatchUp = + "supportsCriticalSectionDuringCatchUp"_sd; + /** * Parses the input command and produces a request corresponding to its arguments. */ |