diff options
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 234 |
1 files changed, 156 insertions, 78 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index a5079a4041d..c324a8affe5 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -413,7 +413,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte } } else { invariant(PlanExecutor::IS_EOF == _jumboChunkCloneState->clonerState); - invariant(_cloneLocs.empty()); + invariant(_cloneList.hasMore()); } } @@ -712,61 +712,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon _jumboChunkCloneState->clonerExec->detachFromOperationContext(); } -boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc( - OperationContext* opCtx, const CollectionPtr& collection) { - while (true) { - stdx::unique_lock lk(_mutex); - invariant(_inProgressReads >= 0); - RecordId nextRecordId; - Snapshotted<BSONObj> doc; - - _moreDocsCV.wait(lk, [&]() { - return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() || - _inProgressReads == 0; - }); - - // One of the following must now be true (corresponding to the three if conditions): - // 1. There is a document in the overflow set - // 2. The iterator has not reached the end of the record id set - // 3. The overflow set is empty, the iterator is at the end, and - // no threads are holding a document. This condition indicates - // that there are no more docs to return for the cloning phase. - if (!_overflowDocs.empty()) { - doc = std::move(_overflowDocs.front()); - _overflowDocs.pop_front(); - ++_inProgressReads; - return doc; - } else if (_cloneRecordIdsIter != _cloneLocs.end()) { - nextRecordId = *_cloneRecordIdsIter; - ++_cloneRecordIdsIter; - ++_inProgressReads; - } else { - invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size()); - return boost::none; - } - - // In order to saturate the disk, the I/O operation must occur without - // holding the mutex. - lk.unlock(); - if (collection->findDoc(opCtx, nextRecordId, &doc)) - return doc; - lk.lock(); - ++_numRecordsPassedOver; - - // It is possible that this document is no longer in the collection, - // in which case, we try again and indicate to other threads that this - // thread is not holding a document. - --_inProgressReads; - _moreDocsCV.notify_one(); - } -} - -void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) { - stdx::lock_guard lk(_mutex); - invariant(_inProgressReads >= 1); - _overflowDocs.push_back(std::move(doc)); -} - void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx, const CollectionPtr& collection, BSONArrayBuilder* arrBuilder) { @@ -774,18 +719,24 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())); - while (auto doc = _getNextDoc(opCtx, collection)) { - ON_BLOCK_EXIT([&]() { + while (true) { + int recordsNoLongerExist = 0; + auto docInFlight = _cloneList.getNextDoc(opCtx, collection, &recordsNoLongerExist); + + if (recordsNoLongerExist) { stdx::lock_guard lk(_mutex); - invariant(_inProgressReads > 0); - --_inProgressReads; - _moreDocsCV.notify_one(); - }); + _numRecordsPassedOver += recordsNoLongerExist; + } + + const auto& doc = docInFlight->getDoc(); + if (!doc) { + break; + } // We must always make progress in this method by at least one document because empty // return indicates there is no more initial clone data. if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) { - _insertOverflowDoc(std::move(*doc)); + _cloneList.insertOverflowDoc(*doc); break; } @@ -807,7 +758,7 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon // that we take into consideration the overhead of BSONArray indices. if (arrBuilder->arrSize() && (arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) { - _insertOverflowDoc(std::move(*doc)); + _cloneList.insertOverflowDoc(*doc); break; } @@ -818,12 +769,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon arrBuilder->append(doc->value()); ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1); } - - // When we reach here, there are no more documents to return to the destination. - // We therefore need to notify a other threads that maybe sleeping on the condition - // variable that we are done. - stdx::lock_guard lk(_mutex); - _moreDocsCV.notify_one(); } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -832,7 +777,7 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { return static_cast<uint64_t>(BSONObjMaxUserSize); return std::min(static_cast<uint64_t>(BSONObjMaxUserSize), - _averageObjectSizeForCloneLocs * _cloneLocs.size()); + _averageObjectSizeForCloneLocs * _cloneList.size()); } Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, @@ -866,7 +811,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, { // All clone data must have been drained before starting to fetch the incremental changes. stdx::unique_lock<Latch> lk(_mutex); - invariant(_cloneRecordIdsIter == _cloneLocs.end()); + invariant(!_cloneList.hasMore()); // The "snapshot" for delete and update list must be taken under a single lock. This is to // ensure that we will preserve the causal order of writes. Always consume the delete @@ -1045,6 +990,8 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC try { BSONObj obj; RecordId recordId; + RecordIdSet recordIdSet; + while (PlanExecutor::ADVANCED == exec->getNext(&obj, &recordId)) { Status interruptStatus = opCtx->checkForInterruptNoAssert(); if (!interruptStatus.isOK()) { @@ -1052,20 +999,20 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC } if (!isLargeChunk) { - stdx::lock_guard<Latch> lk(_mutex); - _cloneLocs.insert(recordId); + recordIdSet.insert(recordId); } if (++recCount > maxRecsWhenFull) { isLargeChunk = true; if (_forceJumbo) { - _cloneLocs.clear(); + recordIdSet.clear(); break; } } } - _cloneRecordIdsIter = _cloneLocs.begin(); + + _cloneList.populateList(std::move(recordIdSet)); } catch (DBException& exception) { exception.addContext("Executor error while scanning for documents belonging to chunk"); throw; @@ -1185,13 +1132,13 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "response"_attr = redact(res), "memoryUsedBytes"_attr = _memoryUsed, "docsRemainingToClone"_attr = - _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver, + _cloneList.size() - _numRecordsCloned - _numRecordsPassedOver, "untransferredModsSizeBytes"_attr = untransferredModsSizeBytes); } if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase && estimateUntransferredSessionsSize == 0) { - if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) || + if (_cloneList.hasMore() || (_jumboChunkCloneState && _forceJumbo && PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) { return {ErrorCodes::OperationIncomplete, @@ -1335,4 +1282,135 @@ MigrationChunkClonerSourceLegacy::getNotificationForNextSessionMigrationBatch() return _sessionCatalogSource->getNotificationForNewOplog(); } +MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::DocumentInFlightWithLock( + WithLock lock, MigrationChunkClonerSourceLegacy::CloneList& clonerList) + : _inProgressReadToken( + std::make_unique<MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken>( + lock, clonerList)) {} + +void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::setDoc( + boost::optional<Snapshotted<BSONObj>> doc) { + _doc = std::move(doc); +} + +std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock> +MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::release() { + invariant(_inProgressReadToken); + + return std::make_unique< + MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>( + std::move(_inProgressReadToken), std::move(_doc)); +} + +MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock:: + DocumentInFlightWhileNotInLock( + std::unique_ptr<CloneList::InProgressReadToken> inProgressReadToken, + boost::optional<Snapshotted<BSONObj>> doc) + : _inProgressReadToken(std::move(inProgressReadToken)), _doc(std::move(doc)) {} + +void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::setDoc( + boost::optional<Snapshotted<BSONObj>> doc) { + _doc = std::move(doc); +} + +const boost::optional<Snapshotted<BSONObj>>& +MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::getDoc() { + return _doc; +} + +MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::InProgressReadToken( + WithLock withLock, CloneList& cloneList) + : _cloneList(cloneList) { + _cloneList._startedOneInProgressRead(withLock); +} + +MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::~InProgressReadToken() { + _cloneList._finishedOneInProgressRead(); +} + +MigrationChunkClonerSourceLegacy::CloneList::CloneList() { + _recordIdsIter = _recordIds.begin(); +} + +void MigrationChunkClonerSourceLegacy::CloneList::populateList(RecordIdSet recordIds) { + stdx::lock_guard lk(_mutex); + _recordIds = std::move(recordIds); + _recordIdsIter = _recordIds.begin(); +} + +void MigrationChunkClonerSourceLegacy::CloneList::insertOverflowDoc(Snapshotted<BSONObj> doc) { + stdx::lock_guard lk(_mutex); + invariant(_inProgressReads >= 1); + _overflowDocs.push_back(std::move(doc)); +} + +bool MigrationChunkClonerSourceLegacy::CloneList::hasMore() const { + stdx::lock_guard lk(_mutex); + return _recordIdsIter != _recordIds.cend() && _inProgressReads > 0; +} + +std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock> +MigrationChunkClonerSourceLegacy::CloneList::getNextDoc(OperationContext* opCtx, + const CollectionPtr& collection, + int* numRecordsNoLongerExist) { + while (true) { + stdx::unique_lock lk(_mutex); + invariant(_inProgressReads >= 0); + RecordId nextRecordId; + + opCtx->waitForConditionOrInterrupt(_moreDocsCV, lk, [&]() { + return _recordIdsIter != _recordIds.end() || !_overflowDocs.empty() || + _inProgressReads == 0; + }); + + DocumentInFlightWithLock docInFlight(lk, *this); + + // One of the following must now be true (corresponding to the three if conditions): + // 1. There is a document in the overflow set + // 2. The iterator has not reached the end of the record id set + // 3. The overflow set is empty, the iterator is at the end, and + // no threads are holding a document. This condition indicates + // that there are no more docs to return for the cloning phase. + if (!_overflowDocs.empty()) { + docInFlight.setDoc(std::move(_overflowDocs.front())); + _overflowDocs.pop_front(); + return docInFlight.release(); + } else if (_recordIdsIter != _recordIds.end()) { + nextRecordId = *_recordIdsIter; + ++_recordIdsIter; + } else { + return docInFlight.release(); + } + + lk.unlock(); + + auto docInFlightWhileNotLocked = docInFlight.release(); + + Snapshotted<BSONObj> doc; + if (collection->findDoc(opCtx, nextRecordId, &doc)) { + docInFlightWhileNotLocked->setDoc(std::move(doc)); + return docInFlightWhileNotLocked; + } + + if (numRecordsNoLongerExist) { + (*numRecordsNoLongerExist)++; + } + } +} + +size_t MigrationChunkClonerSourceLegacy::CloneList::size() const { + stdx::unique_lock lk(_mutex); + return _recordIds.size(); +} + +void MigrationChunkClonerSourceLegacy::CloneList::_startedOneInProgressRead(WithLock) { + _inProgressReads++; +} + +void MigrationChunkClonerSourceLegacy::CloneList::_finishedOneInProgressRead() { + stdx::lock_guard lk(_mutex); + _inProgressReads--; + _moreDocsCV.notify_one(); +} + } // namespace mongo |