diff options
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 132 |
1 files changed, 98 insertions, 34 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 0aed75b5303..a5079a4041d 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -34,6 +34,7 @@ #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/base/status.h" +#include "mongo/bson/bsonobj.h" #include "mongo/client/read_preference.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog_raii.h" @@ -711,6 +712,61 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon _jumboChunkCloneState->clonerExec->detachFromOperationContext(); } +boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc( + OperationContext* opCtx, const CollectionPtr& collection) { + while (true) { + stdx::unique_lock lk(_mutex); + invariant(_inProgressReads >= 0); + RecordId nextRecordId; + Snapshotted<BSONObj> doc; + + _moreDocsCV.wait(lk, [&]() { + return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() || + _inProgressReads == 0; + }); + + // One of the following must now be true (corresponding to the three if conditions): + // 1. There is a document in the overflow set + // 2. The iterator has not reached the end of the record id set + // 3. The overflow set is empty, the iterator is at the end, and + // no threads are holding a document. This condition indicates + // that there are no more docs to return for the cloning phase. + if (!_overflowDocs.empty()) { + doc = std::move(_overflowDocs.front()); + _overflowDocs.pop_front(); + ++_inProgressReads; + return doc; + } else if (_cloneRecordIdsIter != _cloneLocs.end()) { + nextRecordId = *_cloneRecordIdsIter; + ++_cloneRecordIdsIter; + ++_inProgressReads; + } else { + invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size()); + return boost::none; + } + + // In order to saturate the disk, the I/O operation must occur without + // holding the mutex. + lk.unlock(); + if (collection->findDoc(opCtx, nextRecordId, &doc)) + return doc; + lk.lock(); + ++_numRecordsPassedOver; + + // It is possible that this document is no longer in the collection, + // in which case, we try again and indicate to other threads that this + // thread is not holding a document. + --_inProgressReads; + _moreDocsCV.notify_one(); + } +} + +void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) { + stdx::lock_guard lk(_mutex); + invariant(_inProgressReads >= 1); + _overflowDocs.push_back(std::move(doc)); +} + void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx, const CollectionPtr& collection, BSONArrayBuilder* arrBuilder) { @@ -718,49 +774,56 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())); - stdx::unique_lock<Latch> lk(_mutex); - auto iter = _cloneLocs.begin(); + while (auto doc = _getNextDoc(opCtx, collection)) { + ON_BLOCK_EXIT([&]() { + stdx::lock_guard lk(_mutex); + invariant(_inProgressReads > 0); + --_inProgressReads; + _moreDocsCV.notify_one(); + }); - for (; iter != _cloneLocs.end(); ++iter) { // We must always make progress in this method by at least one document because empty // return indicates there is no more initial clone data. if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) { + _insertOverflowDoc(std::move(*doc)); break; } - auto nextRecordId = *iter; - - lk.unlock(); - ON_BLOCK_EXIT([&lk] { lk.lock(); }); - - Snapshotted<BSONObj> doc; - if (collection->findDoc(opCtx, nextRecordId, &doc)) { - // Do not send documents that are no longer in the chunk range being moved. This can - // happen when document shard key value of the document changed after the initial - // index scan during cloning. This is needed because the destination is very - // conservative in processing xferMod deletes and won't delete docs that are not in - // the range of the chunk being migrated. - if (!isDocInRange(doc.value(), - _args.getMin().value(), - _args.getMax().value(), - _shardKeyPattern)) { - continue; + // Do not send documents that are no longer in the chunk range being moved. This can + // happen when document shard key value of the document changed after the initial + // index scan during cloning. This is needed because the destination is very + // conservative in processing xferMod deletes and won't delete docs that are not in + // the range of the chunk being migrated. + if (!isDocInRange( + doc->value(), _args.getMin().value(), _args.getMax().value(), _shardKeyPattern)) { + { + stdx::lock_guard lk(_mutex); + _numRecordsPassedOver++; } + continue; + } - // Use the builder size instead of accumulating the document sizes directly so - // that we take into consideration the overhead of BSONArray indices. - if (arrBuilder->arrSize() && - (arrBuilder->len() + doc.value().objsize() + 1024) > BSONObjMaxUserSize) { - - break; - } + // Use the builder size instead of accumulating the document sizes directly so + // that we take into consideration the overhead of BSONArray indices. + if (arrBuilder->arrSize() && + (arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) { + _insertOverflowDoc(std::move(*doc)); + break; + } - arrBuilder->append(doc.value()); - ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1); + { + stdx::lock_guard lk(_mutex); + _numRecordsCloned++; } + arrBuilder->append(doc->value()); + ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1); } - _cloneLocs.erase(_cloneLocs.begin(), iter); + // When we reach here, there are no more documents to return to the destination. + // We therefore need to notify a other threads that maybe sleeping on the condition + // variable that we are done. + stdx::lock_guard lk(_mutex); + _moreDocsCV.notify_one(); } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -803,7 +866,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, { // All clone data must have been drained before starting to fetch the incremental changes. stdx::unique_lock<Latch> lk(_mutex); - invariant(_cloneLocs.empty()); + invariant(_cloneRecordIdsIter == _cloneLocs.end()); // The "snapshot" for delete and update list must be taken under a single lock. This is to // ensure that we will preserve the causal order of writes. Always consume the delete @@ -1002,6 +1065,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC } } } + _cloneRecordIdsIter = _cloneLocs.begin(); } catch (DBException& exception) { exception.addContext("Executor error while scanning for documents belonging to chunk"); throw; @@ -1099,7 +1163,6 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC stdx::lock_guard<Latch> sl(_mutex); - const std::size_t cloneLocsRemaining = _cloneLocs.size(); int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize + _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs; @@ -1121,13 +1184,14 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "moveChunk data transfer progress", "response"_attr = redact(res), "memoryUsedBytes"_attr = _memoryUsed, - "docsRemainingToClone"_attr = cloneLocsRemaining, + "docsRemainingToClone"_attr = + _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver, "untransferredModsSizeBytes"_attr = untransferredModsSizeBytes); } if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase && estimateUntransferredSessionsSize == 0) { - if (cloneLocsRemaining != 0 || + if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) || (_jumboChunkCloneState && _forceJumbo && PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) { return {ErrorCodes::OperationIncomplete, |