summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2021-06-21 13:00:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-23 08:43:38 +0000
commit3f53fa25e014fbe85d35220612359b8006f7e457 (patch)
tree9ca996e13b600e0aca653f3d1aa84bd05b4885d5
parentd6ffe85b864709e1da57e037b1ad3b74d77b6a45 (diff)
downloadmongo-3f53fa25e014fbe85d35220612359b8006f7e457.tar.gz
SERVER-56307 fix the convergence criteria to end the catchup phase
(cherry picked from commit 49209fc34e19bbb15405b0927e38ff3e7d9e9dc5) (cherry picked from commit cbddd17826a934022c4be6287afc3dfde8206ad8)
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp112
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h36
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp10
-rw-r--r--src/mongo/db/s/migration_destination_manager.h3
4 files changed, 108 insertions, 53 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 005f1071ac7..e07695ac82f 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -162,6 +162,7 @@ public:
case 'd': {
stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex);
_cloner->_deleted.push_back(_idObj);
+ ++_cloner->_untransferredDeletesCounter;
_cloner->_memoryUsed += _idObj.firstElement().size() + 5;
} break;
@@ -169,6 +170,7 @@ public:
case 'u': {
stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex);
_cloner->_reload.push_back(_idObj);
+ ++_cloner->_untransferredUpsertsCounter;
_cloner->_memoryUsed += _idObj.firstElement().size() + 5;
} break;
@@ -317,8 +319,9 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
}
if (res["state"].String() == "catchup" && supportsCriticalSectionDuringCatchUp) {
- int64_t estimatedUntransferredModsSize = _deleted.size() * _averageObjectIdSize +
- _reload.size() * _averageObjectSizeForCloneLocs;
+ int64_t estimatedUntransferredModsSize =
+ _untransferredDeletesCounter * _averageObjectIdSize +
+ _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs;
auto estimatedUntransferredChunkPercentage =
(std::min(_args.getMaxChunkSizeBytes(), estimatedUntransferredModsSize) * 100) /
_args.getMaxChunkSizeBytes();
@@ -563,17 +566,36 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
BSONObjBuilder* builder) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS));
- stdx::lock_guard<stdx::mutex> sl(_mutex);
+ std::list<BSONObj> deleteList;
+ std::list<BSONObj> updateList;
+
+ {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+
+ // All clone data must have been drained before starting to fetch the incremental changes
+ invariant(_cloneLocs.empty());
- // All clone data must have been drained before starting to fetch the incremental changes
- invariant(_cloneLocs.empty());
+ // The "snapshot" for delete and update list must be taken under a single lock. This is to
+ // ensure that we will preserve the causal order of writes. Always consume the delete
+ // buffer first, before the update buffer. If the delete is causally before the update to
+ // the same doc, then there's no problem since we consume the delete buffer first. If the
+ // delete is causally after, we will not be able to see the document when we attempt to
+ // fetch it, so it's also ok.
+ deleteList.splice(deleteList.cbegin(), _deleted);
+ updateList.splice(updateList.cbegin(), _reload);
+ }
- long long docSizeAccumulator = 0;
+ auto totalDocSize = _xferDeletes(builder, &deleteList, 0);
+ totalDocSize = _xferUpdates(opCtx, db, builder, &updateList, totalDocSize);
- _xfer(opCtx, db, &_deleted, builder, "deleted", &docSizeAccumulator, false);
- _xfer(opCtx, db, &_reload, builder, "reload", &docSizeAccumulator, true);
+ builder->append("size", totalDocSize);
- builder->append("size", docSizeAccumulator);
+ // Put back remaining ids we didn't consume
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ _deleted.splice(_deleted.cbegin(), deleteList);
+ _untransferredDeletesCounter = _deleted.size();
+ _reload.splice(_reload.cbegin(), updateList);
+ _untransferredUpsertsCounter = _reload.size();
return Status::OK();
}
@@ -583,7 +605,9 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
_state = kDone;
_reload.clear();
+ _untransferredUpsertsCounter = 0;
_deleted.clear();
+ _untransferredDeletesCounter = 0;
}
// Implicitly resets _deleteNotifyExec to avoid possible invariant failure
// in on destruction of MigrationChunkClonerSourceLegacy, and will always
@@ -791,41 +815,61 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
return Status::OK();
}
-void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx,
- Database* db,
- std::list<BSONObj>* docIdList,
- BSONObjBuilder* builder,
- const char* fieldName,
- long long* sizeAccumulator,
- bool explode) {
+long long MigrationChunkClonerSourceLegacy::_xferDeletes(BSONObjBuilder* builder,
+ std::list<BSONObj>* removeList,
+ long long initialSize) {
const long long maxSize = 1024 * 1024;
- if (docIdList->size() == 0 || *sizeAccumulator > maxSize) {
- return;
+ if (removeList->empty() || initialSize > maxSize) {
+ return initialSize;
}
- const std::string& ns = _args.getNss().ns();
-
- BSONArrayBuilder arr(builder->subarrayStart(fieldName));
+ long long totalSize = initialSize;
+ BSONArrayBuilder arr(builder->subarrayStart("deleted"));
- std::list<BSONObj>::iterator docIdIter = docIdList->begin();
- while (docIdIter != docIdList->end() && *sizeAccumulator < maxSize) {
+ auto docIdIter = removeList->begin();
+ for (; docIdIter != removeList->end() && totalSize < maxSize; ++docIdIter) {
BSONObj idDoc = *docIdIter;
- if (explode) {
- BSONObj fullDoc;
- if (Helpers::findById(opCtx, db, ns.c_str(), idDoc, fullDoc)) {
- arr.append(fullDoc);
- *sizeAccumulator += fullDoc.objsize();
- }
- } else {
- arr.append(idDoc);
- *sizeAccumulator += idDoc.objsize();
- }
+ arr.append(idDoc);
+ totalSize += idDoc.objsize();
+ }
- docIdIter = docIdList->erase(docIdIter);
+ removeList->erase(removeList->begin(), docIdIter);
+
+ arr.done();
+ return totalSize;
+}
+
+long long MigrationChunkClonerSourceLegacy::_xferUpdates(OperationContext* opCtx,
+ Database* db,
+ BSONObjBuilder* builder,
+ std::list<BSONObj>* updateList,
+ long long initialSize) {
+ const long long maxSize = 1024 * 1024;
+
+ if (updateList->empty() || initialSize > maxSize) {
+ return initialSize;
}
+ const auto& nss = _args.getNss();
+ BSONArrayBuilder arr(builder->subarrayStart("reload"));
+ long long totalSize = initialSize;
+
+ auto iter = updateList->begin();
+ for (; iter != updateList->end() && totalSize < maxSize; ++iter) {
+ auto idDoc = *iter;
+
+ BSONObj fullDoc;
+ if (Helpers::findById(opCtx, db, nss.ns().c_str(), idDoc, fullDoc)) {
+ arr.append(fullDoc);
+ totalSize += fullDoc.objsize();
+ }
+ }
+
+ updateList->erase(updateList->begin(), iter);
+
arr.done();
+ return totalSize;
}
boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index aa9b5271119..635fd2a4a93 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -203,20 +203,24 @@ private:
Status _storeCurrentLocs(OperationContext* opCtx);
/**
- * Insert items from docIdList to a new array with the given fieldName in the given builder. If
- * explode is true, the inserted object will be the full version of the document. Note that
- * whenever an item from the docList is inserted to the array, it will also be removed from
- * docList.
- *
- * Should be holding the collection lock for ns if explode is true.
+ * Appends to the builder the list of _id of documents that were deleted during migration.
+ * Entries appended to the builder are removed from the list.
+ * Returns the total size of the documents that were appended + initialSize.
+ */
+ long long _xferDeletes(BSONObjBuilder* builder,
+ std::list<BSONObj>* removeList,
+ long long initialSize);
+
+ /**
+ * Appends to the builder the list of full documents that were modified/inserted during the
+ * migration. Entries appended to the builder are removed from the list.
+ * Returns the total size of the documents that were appended + initialSize.
*/
- void _xfer(OperationContext* opCtx,
- Database* db,
- std::list<BSONObj>* docIdList,
- BSONObjBuilder* builder,
- const char* fieldName,
- long long* sizeAccumulator,
- bool explode);
+ long long _xferUpdates(OperationContext* opCtx,
+ Database* db,
+ BSONObjBuilder* builder,
+ std::list<BSONObj>* updateList,
+ long long initialSize);
// The original move chunk request
const MoveChunkRequest _args;
@@ -258,10 +262,16 @@ private:
// List of _id of documents that were modified that must be re-cloned (xfer mods)
std::list<BSONObj> _reload;
+ // Amount of upsert xfer mods that have not yet reached the recipient.
+ size_t _untransferredUpsertsCounter{0};
+
// List of _id of documents that were deleted during clone that should be deleted later (xfer
// mods)
std::list<BSONObj> _deleted;
+ // Amount of delete xfer mods that have not yet reached the recipient.
+ size_t _untransferredDeletesCounter{0};
+
// Total bytes in _reload + _deleted (xfer mods)
uint64_t _memoryUsed{0};
};
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index c2a19f9cf2b..06a117662cf 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -230,7 +230,7 @@ MigrationDestinationManager::State MigrationDestinationManager::getState() const
return _state;
}
-void MigrationDestinationManager::setState(State newState) {
+void MigrationDestinationManager::_setState(State newState) {
stdx::lock_guard<stdx::mutex> sl(_mutex);
_state = newState;
_stateChangedCV.notify_all();
@@ -819,7 +819,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
repl::OpTime lastOpApplied;
{
// 3. Initial bulk clone
- setState(CLONE);
+ _setState(CLONE);
_sessionMigration->start(opCtx->getServiceContext());
@@ -921,7 +921,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
{
// 4. Do bulk of mods
- setState(CATCHUP);
+ _setState(CATCHUP);
while (true) {
auto res = uassertStatusOKWithContext(
@@ -993,7 +993,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
{
// 5. Wait for commit
- setState(STEADY);
+ _setState(STEADY);
bool transferAfterCommit = false;
while (getState() == STEADY || getState() == COMMIT_START) {
@@ -1058,7 +1058,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
return;
}
- setState(DONE);
+ _setState(DONE);
timing.done(6);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep6);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index a48f1def197..36a1eab7a00 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -81,7 +81,6 @@ public:
static MigrationDestinationManager* get(OperationContext* opCtx);
State getState() const;
- void setState(State newState);
/**
* Checks whether the MigrationDestinationManager is currently handling a migration.
@@ -146,6 +145,8 @@ private:
void _setStateFail(StringData msg);
void _setStateFailWarn(StringData msg);
+ void _setState(State newState);
+
/**
* Thread which drives the migration apply process on the recipient side.
*/