diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2021-06-21 13:00:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-23 08:43:38 +0000 |
commit | 3f53fa25e014fbe85d35220612359b8006f7e457 (patch) | |
tree | 9ca996e13b600e0aca653f3d1aa84bd05b4885d5 | |
parent | d6ffe85b864709e1da57e037b1ad3b74d77b6a45 (diff) | |
download | mongo-3f53fa25e014fbe85d35220612359b8006f7e457.tar.gz |
SERVER-56307 fix the convergence criteria to end the catchup phase
(cherry picked from commit 49209fc34e19bbb15405b0927e38ff3e7d9e9dc5)
(cherry picked from commit cbddd17826a934022c4be6287afc3dfde8206ad8)
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 112 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 36 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 3 |
4 files changed, 108 insertions, 53 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 005f1071ac7..e07695ac82f 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -162,6 +162,7 @@ public: case 'd': { stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex); _cloner->_deleted.push_back(_idObj); + ++_cloner->_untransferredDeletesCounter; _cloner->_memoryUsed += _idObj.firstElement().size() + 5; } break; @@ -169,6 +170,7 @@ public: case 'u': { stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex); _cloner->_reload.push_back(_idObj); + ++_cloner->_untransferredUpsertsCounter; _cloner->_memoryUsed += _idObj.firstElement().size() + 5; } break; @@ -317,8 +319,9 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( } if (res["state"].String() == "catchup" && supportsCriticalSectionDuringCatchUp) { - int64_t estimatedUntransferredModsSize = _deleted.size() * _averageObjectIdSize + - _reload.size() * _averageObjectSizeForCloneLocs; + int64_t estimatedUntransferredModsSize = + _untransferredDeletesCounter * _averageObjectIdSize + + _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs; auto estimatedUntransferredChunkPercentage = (std::min(_args.getMaxChunkSizeBytes(), estimatedUntransferredModsSize) * 100) / _args.getMaxChunkSizeBytes(); @@ -563,17 +566,36 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, BSONObjBuilder* builder) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS)); - stdx::lock_guard<stdx::mutex> sl(_mutex); + std::list<BSONObj> deleteList; + std::list<BSONObj> updateList; + + { + stdx::lock_guard<stdx::mutex> sl(_mutex); + + // All clone data must have been drained before starting to fetch the incremental changes + invariant(_cloneLocs.empty()); - // All clone data must have been drained before starting to fetch the incremental changes - invariant(_cloneLocs.empty()); + // The "snapshot" for delete and update list must be taken under a single lock. This is to + // ensure that we will preserve the causal order of writes. Always consume the delete + // buffer first, before the update buffer. If the delete is causally before the update to + // the same doc, then there's no problem since we consume the delete buffer first. If the + // delete is causally after, we will not be able to see the document when we attempt to + // fetch it, so it's also ok. + deleteList.splice(deleteList.cbegin(), _deleted); + updateList.splice(updateList.cbegin(), _reload); + } - long long docSizeAccumulator = 0; + auto totalDocSize = _xferDeletes(builder, &deleteList, 0); + totalDocSize = _xferUpdates(opCtx, db, builder, &updateList, totalDocSize); - _xfer(opCtx, db, &_deleted, builder, "deleted", &docSizeAccumulator, false); - _xfer(opCtx, db, &_reload, builder, "reload", &docSizeAccumulator, true); + builder->append("size", totalDocSize); - builder->append("size", docSizeAccumulator); + // Put back remaining ids we didn't consume + stdx::lock_guard<stdx::mutex> sl(_mutex); + _deleted.splice(_deleted.cbegin(), deleteList); + _untransferredDeletesCounter = _deleted.size(); + _reload.splice(_reload.cbegin(), updateList); + _untransferredUpsertsCounter = _reload.size(); return Status::OK(); } @@ -583,7 +605,9 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> sl(_mutex); _state = kDone; _reload.clear(); + _untransferredUpsertsCounter = 0; _deleted.clear(); + _untransferredDeletesCounter = 0; } // Implicitly resets _deleteNotifyExec to avoid possible invariant failure // in on destruction of MigrationChunkClonerSourceLegacy, and will always @@ -791,41 +815,61 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC return Status::OK(); } -void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, - Database* db, - std::list<BSONObj>* docIdList, - BSONObjBuilder* builder, - const char* fieldName, - long long* sizeAccumulator, - bool explode) { +long long MigrationChunkClonerSourceLegacy::_xferDeletes(BSONObjBuilder* builder, + std::list<BSONObj>* removeList, + long long initialSize) { const long long maxSize = 1024 * 1024; - if (docIdList->size() == 0 || *sizeAccumulator > maxSize) { - return; + if (removeList->empty() || initialSize > maxSize) { + return initialSize; } - const std::string& ns = _args.getNss().ns(); - - BSONArrayBuilder arr(builder->subarrayStart(fieldName)); + long long totalSize = initialSize; + BSONArrayBuilder arr(builder->subarrayStart("deleted")); - std::list<BSONObj>::iterator docIdIter = docIdList->begin(); - while (docIdIter != docIdList->end() && *sizeAccumulator < maxSize) { + auto docIdIter = removeList->begin(); + for (; docIdIter != removeList->end() && totalSize < maxSize; ++docIdIter) { BSONObj idDoc = *docIdIter; - if (explode) { - BSONObj fullDoc; - if (Helpers::findById(opCtx, db, ns.c_str(), idDoc, fullDoc)) { - arr.append(fullDoc); - *sizeAccumulator += fullDoc.objsize(); - } - } else { - arr.append(idDoc); - *sizeAccumulator += idDoc.objsize(); - } + arr.append(idDoc); + totalSize += idDoc.objsize(); + } - docIdIter = docIdList->erase(docIdIter); + removeList->erase(removeList->begin(), docIdIter); + + arr.done(); + return totalSize; +} + +long long MigrationChunkClonerSourceLegacy::_xferUpdates(OperationContext* opCtx, + Database* db, + BSONObjBuilder* builder, + std::list<BSONObj>* updateList, + long long initialSize) { + const long long maxSize = 1024 * 1024; + + if (updateList->empty() || initialSize > maxSize) { + return initialSize; } + const auto& nss = _args.getNss(); + BSONArrayBuilder arr(builder->subarrayStart("reload")); + long long totalSize = initialSize; + + auto iter = updateList->begin(); + for (; iter != updateList->end() && totalSize < maxSize; ++iter) { + auto idDoc = *iter; + + BSONObj fullDoc; + if (Helpers::findById(opCtx, db, nss.ns().c_str(), idDoc, fullDoc)) { + arr.append(fullDoc); + totalSize += fullDoc.objsize(); + } + } + + updateList->erase(updateList->begin(), iter); + arr.done(); + return totalSize; } boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch( diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index aa9b5271119..635fd2a4a93 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -203,20 +203,24 @@ private: Status _storeCurrentLocs(OperationContext* opCtx); /** - * Insert items from docIdList to a new array with the given fieldName in the given builder. If - * explode is true, the inserted object will be the full version of the document. Note that - * whenever an item from the docList is inserted to the array, it will also be removed from - * docList. - * - * Should be holding the collection lock for ns if explode is true. + * Appends to the builder the list of _id of documents that were deleted during migration. + * Entries appended to the builder are removed from the list. + * Returns the total size of the documents that were appended + initialSize. + */ + long long _xferDeletes(BSONObjBuilder* builder, + std::list<BSONObj>* removeList, + long long initialSize); + + /** + * Appends to the builder the list of full documents that were modified/inserted during the + * migration. Entries appended to the builder are removed from the list. + * Returns the total size of the documents that were appended + initialSize. */ - void _xfer(OperationContext* opCtx, - Database* db, - std::list<BSONObj>* docIdList, - BSONObjBuilder* builder, - const char* fieldName, - long long* sizeAccumulator, - bool explode); + long long _xferUpdates(OperationContext* opCtx, + Database* db, + BSONObjBuilder* builder, + std::list<BSONObj>* updateList, + long long initialSize); // The original move chunk request const MoveChunkRequest _args; @@ -258,10 +262,16 @@ private: // List of _id of documents that were modified that must be re-cloned (xfer mods) std::list<BSONObj> _reload; + // Amount of upsert xfer mods that have not yet reached the recipient. + size_t _untransferredUpsertsCounter{0}; + // List of _id of documents that were deleted during clone that should be deleted later (xfer // mods) std::list<BSONObj> _deleted; + // Amount of delete xfer mods that have not yet reached the recipient. + size_t _untransferredDeletesCounter{0}; + // Total bytes in _reload + _deleted (xfer mods) uint64_t _memoryUsed{0}; }; diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index c2a19f9cf2b..06a117662cf 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -230,7 +230,7 @@ MigrationDestinationManager::State MigrationDestinationManager::getState() const return _state; } -void MigrationDestinationManager::setState(State newState) { +void MigrationDestinationManager::_setState(State newState) { stdx::lock_guard<stdx::mutex> sl(_mutex); _state = newState; _stateChangedCV.notify_all(); @@ -819,7 +819,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { repl::OpTime lastOpApplied; { // 3. Initial bulk clone - setState(CLONE); + _setState(CLONE); _sessionMigration->start(opCtx->getServiceContext()); @@ -921,7 +921,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { { // 4. Do bulk of mods - setState(CATCHUP); + _setState(CATCHUP); while (true) { auto res = uassertStatusOKWithContext( @@ -993,7 +993,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { { // 5. Wait for commit - setState(STEADY); + _setState(STEADY); bool transferAfterCommit = false; while (getState() == STEADY || getState() == COMMIT_START) { @@ -1058,7 +1058,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { return; } - setState(DONE); + _setState(DONE); timing.done(6); MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep6); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index a48f1def197..36a1eab7a00 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -81,7 +81,6 @@ public: static MigrationDestinationManager* get(OperationContext* opCtx); State getState() const; - void setState(State newState); /** * Checks whether the MigrationDestinationManager is currently handling a migration. @@ -146,6 +145,8 @@ private: void _setStateFail(StringData msg); void _setStateFailWarn(StringData msg); + void _setState(State newState); + /** * Thread which drives the migration apply process on the recipient side. */ |