summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp234
1 files changed, 156 insertions, 78 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index a5079a4041d..c324a8affe5 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -413,7 +413,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte
}
} else {
invariant(PlanExecutor::IS_EOF == _jumboChunkCloneState->clonerState);
- invariant(_cloneLocs.empty());
+ invariant(_cloneList.hasMore());
}
}
@@ -712,61 +712,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon
_jumboChunkCloneState->clonerExec->detachFromOperationContext();
}
-boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc(
- OperationContext* opCtx, const CollectionPtr& collection) {
- while (true) {
- stdx::unique_lock lk(_mutex);
- invariant(_inProgressReads >= 0);
- RecordId nextRecordId;
- Snapshotted<BSONObj> doc;
-
- _moreDocsCV.wait(lk, [&]() {
- return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() ||
- _inProgressReads == 0;
- });
-
- // One of the following must now be true (corresponding to the three if conditions):
- // 1. There is a document in the overflow set
- // 2. The iterator has not reached the end of the record id set
- // 3. The overflow set is empty, the iterator is at the end, and
- // no threads are holding a document. This condition indicates
- // that there are no more docs to return for the cloning phase.
- if (!_overflowDocs.empty()) {
- doc = std::move(_overflowDocs.front());
- _overflowDocs.pop_front();
- ++_inProgressReads;
- return doc;
- } else if (_cloneRecordIdsIter != _cloneLocs.end()) {
- nextRecordId = *_cloneRecordIdsIter;
- ++_cloneRecordIdsIter;
- ++_inProgressReads;
- } else {
- invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size());
- return boost::none;
- }
-
- // In order to saturate the disk, the I/O operation must occur without
- // holding the mutex.
- lk.unlock();
- if (collection->findDoc(opCtx, nextRecordId, &doc))
- return doc;
- lk.lock();
- ++_numRecordsPassedOver;
-
- // It is possible that this document is no longer in the collection,
- // in which case, we try again and indicate to other threads that this
- // thread is not holding a document.
- --_inProgressReads;
- _moreDocsCV.notify_one();
- }
-}
-
-void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) {
- stdx::lock_guard lk(_mutex);
- invariant(_inProgressReads >= 1);
- _overflowDocs.push_back(std::move(doc));
-}
-
void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx,
const CollectionPtr& collection,
BSONArrayBuilder* arrBuilder) {
@@ -774,18 +719,24 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load()));
- while (auto doc = _getNextDoc(opCtx, collection)) {
- ON_BLOCK_EXIT([&]() {
+ while (true) {
+ int recordsNoLongerExist = 0;
+ auto docInFlight = _cloneList.getNextDoc(opCtx, collection, &recordsNoLongerExist);
+
+ if (recordsNoLongerExist) {
stdx::lock_guard lk(_mutex);
- invariant(_inProgressReads > 0);
- --_inProgressReads;
- _moreDocsCV.notify_one();
- });
+ _numRecordsPassedOver += recordsNoLongerExist;
+ }
+
+ const auto& doc = docInFlight->getDoc();
+ if (!doc) {
+ break;
+ }
// We must always make progress in this method by at least one document because empty
// return indicates there is no more initial clone data.
if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) {
- _insertOverflowDoc(std::move(*doc));
+ _cloneList.insertOverflowDoc(*doc);
break;
}
@@ -807,7 +758,7 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
// that we take into consideration the overhead of BSONArray indices.
if (arrBuilder->arrSize() &&
(arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) {
- _insertOverflowDoc(std::move(*doc));
+ _cloneList.insertOverflowDoc(*doc);
break;
}
@@ -818,12 +769,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
arrBuilder->append(doc->value());
ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
}
-
- // When we reach here, there are no more documents to return to the destination.
- // We therefore need to notify a other threads that maybe sleeping on the condition
- // variable that we are done.
- stdx::lock_guard lk(_mutex);
- _moreDocsCV.notify_one();
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -832,7 +777,7 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
return static_cast<uint64_t>(BSONObjMaxUserSize);
return std::min(static_cast<uint64_t>(BSONObjMaxUserSize),
- _averageObjectSizeForCloneLocs * _cloneLocs.size());
+ _averageObjectSizeForCloneLocs * _cloneList.size());
}
Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
@@ -866,7 +811,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
{
// All clone data must have been drained before starting to fetch the incremental changes.
stdx::unique_lock<Latch> lk(_mutex);
- invariant(_cloneRecordIdsIter == _cloneLocs.end());
+ invariant(!_cloneList.hasMore());
// The "snapshot" for delete and update list must be taken under a single lock. This is to
// ensure that we will preserve the causal order of writes. Always consume the delete
@@ -1045,6 +990,8 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
try {
BSONObj obj;
RecordId recordId;
+ RecordIdSet recordIdSet;
+
while (PlanExecutor::ADVANCED == exec->getNext(&obj, &recordId)) {
Status interruptStatus = opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
@@ -1052,20 +999,20 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
}
if (!isLargeChunk) {
- stdx::lock_guard<Latch> lk(_mutex);
- _cloneLocs.insert(recordId);
+ recordIdSet.insert(recordId);
}
if (++recCount > maxRecsWhenFull) {
isLargeChunk = true;
if (_forceJumbo) {
- _cloneLocs.clear();
+ recordIdSet.clear();
break;
}
}
}
- _cloneRecordIdsIter = _cloneLocs.begin();
+
+ _cloneList.populateList(std::move(recordIdSet));
} catch (DBException& exception) {
exception.addContext("Executor error while scanning for documents belonging to chunk");
throw;
@@ -1185,13 +1132,13 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"response"_attr = redact(res),
"memoryUsedBytes"_attr = _memoryUsed,
"docsRemainingToClone"_attr =
- _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver,
+ _cloneList.size() - _numRecordsCloned - _numRecordsPassedOver,
"untransferredModsSizeBytes"_attr = untransferredModsSizeBytes);
}
if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase &&
estimateUntransferredSessionsSize == 0) {
- if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) ||
+ if (_cloneList.hasMore() ||
(_jumboChunkCloneState && _forceJumbo &&
PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) {
return {ErrorCodes::OperationIncomplete,
@@ -1335,4 +1282,135 @@ MigrationChunkClonerSourceLegacy::getNotificationForNextSessionMigrationBatch()
return _sessionCatalogSource->getNotificationForNewOplog();
}
+MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::DocumentInFlightWithLock(
+ WithLock lock, MigrationChunkClonerSourceLegacy::CloneList& clonerList)
+ : _inProgressReadToken(
+ std::make_unique<MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken>(
+ lock, clonerList)) {}
+
+void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::setDoc(
+ boost::optional<Snapshotted<BSONObj>> doc) {
+ _doc = std::move(doc);
+}
+
+std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>
+MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::release() {
+ invariant(_inProgressReadToken);
+
+ return std::make_unique<
+ MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>(
+ std::move(_inProgressReadToken), std::move(_doc));
+}
+
+MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::
+ DocumentInFlightWhileNotInLock(
+ std::unique_ptr<CloneList::InProgressReadToken> inProgressReadToken,
+ boost::optional<Snapshotted<BSONObj>> doc)
+ : _inProgressReadToken(std::move(inProgressReadToken)), _doc(std::move(doc)) {}
+
+void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::setDoc(
+ boost::optional<Snapshotted<BSONObj>> doc) {
+ _doc = std::move(doc);
+}
+
+const boost::optional<Snapshotted<BSONObj>>&
+MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::getDoc() {
+ return _doc;
+}
+
+MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::InProgressReadToken(
+ WithLock withLock, CloneList& cloneList)
+ : _cloneList(cloneList) {
+ _cloneList._startedOneInProgressRead(withLock);
+}
+
+MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::~InProgressReadToken() {
+ _cloneList._finishedOneInProgressRead();
+}
+
+MigrationChunkClonerSourceLegacy::CloneList::CloneList() {
+ _recordIdsIter = _recordIds.begin();
+}
+
+void MigrationChunkClonerSourceLegacy::CloneList::populateList(RecordIdSet recordIds) {
+ stdx::lock_guard lk(_mutex);
+ _recordIds = std::move(recordIds);
+ _recordIdsIter = _recordIds.begin();
+}
+
+void MigrationChunkClonerSourceLegacy::CloneList::insertOverflowDoc(Snapshotted<BSONObj> doc) {
+ stdx::lock_guard lk(_mutex);
+ invariant(_inProgressReads >= 1);
+ _overflowDocs.push_back(std::move(doc));
+}
+
+bool MigrationChunkClonerSourceLegacy::CloneList::hasMore() const {
+ stdx::lock_guard lk(_mutex);
+ return _recordIdsIter != _recordIds.cend() && _inProgressReads > 0;
+}
+
+std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>
+MigrationChunkClonerSourceLegacy::CloneList::getNextDoc(OperationContext* opCtx,
+ const CollectionPtr& collection,
+ int* numRecordsNoLongerExist) {
+ while (true) {
+ stdx::unique_lock lk(_mutex);
+ invariant(_inProgressReads >= 0);
+ RecordId nextRecordId;
+
+ opCtx->waitForConditionOrInterrupt(_moreDocsCV, lk, [&]() {
+ return _recordIdsIter != _recordIds.end() || !_overflowDocs.empty() ||
+ _inProgressReads == 0;
+ });
+
+ DocumentInFlightWithLock docInFlight(lk, *this);
+
+ // One of the following must now be true (corresponding to the three if conditions):
+ // 1. There is a document in the overflow set
+ // 2. The iterator has not reached the end of the record id set
+ // 3. The overflow set is empty, the iterator is at the end, and
+ // no threads are holding a document. This condition indicates
+ // that there are no more docs to return for the cloning phase.
+ if (!_overflowDocs.empty()) {
+ docInFlight.setDoc(std::move(_overflowDocs.front()));
+ _overflowDocs.pop_front();
+ return docInFlight.release();
+ } else if (_recordIdsIter != _recordIds.end()) {
+ nextRecordId = *_recordIdsIter;
+ ++_recordIdsIter;
+ } else {
+ return docInFlight.release();
+ }
+
+ lk.unlock();
+
+ auto docInFlightWhileNotLocked = docInFlight.release();
+
+ Snapshotted<BSONObj> doc;
+ if (collection->findDoc(opCtx, nextRecordId, &doc)) {
+ docInFlightWhileNotLocked->setDoc(std::move(doc));
+ return docInFlightWhileNotLocked;
+ }
+
+ if (numRecordsNoLongerExist) {
+ (*numRecordsNoLongerExist)++;
+ }
+ }
+}
+
+size_t MigrationChunkClonerSourceLegacy::CloneList::size() const {
+ stdx::unique_lock lk(_mutex);
+ return _recordIds.size();
+}
+
+void MigrationChunkClonerSourceLegacy::CloneList::_startedOneInProgressRead(WithLock) {
+ _inProgressReads++;
+}
+
+void MigrationChunkClonerSourceLegacy::CloneList::_finishedOneInProgressRead() {
+ stdx::lock_guard lk(_mutex);
+ _inProgressReads--;
+ _moreDocsCV.notify_one();
+}
+
} // namespace mongo