summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp132
1 files changed, 98 insertions, 34 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 0aed75b5303..a5079a4041d 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/base/status.h"
+#include "mongo/bson/bsonobj.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog_raii.h"
@@ -711,6 +712,61 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon
_jumboChunkCloneState->clonerExec->detachFromOperationContext();
}
+boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc(
+ OperationContext* opCtx, const CollectionPtr& collection) {
+ while (true) {
+ stdx::unique_lock lk(_mutex);
+ invariant(_inProgressReads >= 0);
+ RecordId nextRecordId;
+ Snapshotted<BSONObj> doc;
+
+ _moreDocsCV.wait(lk, [&]() {
+ return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() ||
+ _inProgressReads == 0;
+ });
+
+ // One of the following must now be true (corresponding to the three if conditions):
+ // 1. There is a document in the overflow set
+ // 2. The iterator has not reached the end of the record id set
+ // 3. The overflow set is empty, the iterator is at the end, and
+ // no threads are holding a document. This condition indicates
+ // that there are no more docs to return for the cloning phase.
+ if (!_overflowDocs.empty()) {
+ doc = std::move(_overflowDocs.front());
+ _overflowDocs.pop_front();
+ ++_inProgressReads;
+ return doc;
+ } else if (_cloneRecordIdsIter != _cloneLocs.end()) {
+ nextRecordId = *_cloneRecordIdsIter;
+ ++_cloneRecordIdsIter;
+ ++_inProgressReads;
+ } else {
+ invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size());
+ return boost::none;
+ }
+
+ // In order to saturate the disk, the I/O operation must occur without
+ // holding the mutex.
+ lk.unlock();
+ if (collection->findDoc(opCtx, nextRecordId, &doc))
+ return doc;
+ lk.lock();
+ ++_numRecordsPassedOver;
+
+ // It is possible that this document is no longer in the collection,
+ // in which case, we try again and indicate to other threads that this
+ // thread is not holding a document.
+ --_inProgressReads;
+ _moreDocsCV.notify_one();
+ }
+}
+
+void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) {
+ stdx::lock_guard lk(_mutex);
+ invariant(_inProgressReads >= 1);
+ _overflowDocs.push_back(std::move(doc));
+}
+
void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx,
const CollectionPtr& collection,
BSONArrayBuilder* arrBuilder) {
@@ -718,49 +774,56 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load()));
- stdx::unique_lock<Latch> lk(_mutex);
- auto iter = _cloneLocs.begin();
+ while (auto doc = _getNextDoc(opCtx, collection)) {
+ ON_BLOCK_EXIT([&]() {
+ stdx::lock_guard lk(_mutex);
+ invariant(_inProgressReads > 0);
+ --_inProgressReads;
+ _moreDocsCV.notify_one();
+ });
- for (; iter != _cloneLocs.end(); ++iter) {
// We must always make progress in this method by at least one document because empty
// return indicates there is no more initial clone data.
if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) {
+ _insertOverflowDoc(std::move(*doc));
break;
}
- auto nextRecordId = *iter;
-
- lk.unlock();
- ON_BLOCK_EXIT([&lk] { lk.lock(); });
-
- Snapshotted<BSONObj> doc;
- if (collection->findDoc(opCtx, nextRecordId, &doc)) {
- // Do not send documents that are no longer in the chunk range being moved. This can
- // happen when document shard key value of the document changed after the initial
- // index scan during cloning. This is needed because the destination is very
- // conservative in processing xferMod deletes and won't delete docs that are not in
- // the range of the chunk being migrated.
- if (!isDocInRange(doc.value(),
- _args.getMin().value(),
- _args.getMax().value(),
- _shardKeyPattern)) {
- continue;
+ // Do not send documents that are no longer in the chunk range being moved. This can
+ // happen when document shard key value of the document changed after the initial
+ // index scan during cloning. This is needed because the destination is very
+ // conservative in processing xferMod deletes and won't delete docs that are not in
+ // the range of the chunk being migrated.
+ if (!isDocInRange(
+ doc->value(), _args.getMin().value(), _args.getMax().value(), _shardKeyPattern)) {
+ {
+ stdx::lock_guard lk(_mutex);
+ _numRecordsPassedOver++;
}
+ continue;
+ }
- // Use the builder size instead of accumulating the document sizes directly so
- // that we take into consideration the overhead of BSONArray indices.
- if (arrBuilder->arrSize() &&
- (arrBuilder->len() + doc.value().objsize() + 1024) > BSONObjMaxUserSize) {
-
- break;
- }
+ // Use the builder size instead of accumulating the document sizes directly so
+ // that we take into consideration the overhead of BSONArray indices.
+ if (arrBuilder->arrSize() &&
+ (arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) {
+ _insertOverflowDoc(std::move(*doc));
+ break;
+ }
- arrBuilder->append(doc.value());
- ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
+ {
+ stdx::lock_guard lk(_mutex);
+ _numRecordsCloned++;
}
+ arrBuilder->append(doc->value());
+ ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
}
- _cloneLocs.erase(_cloneLocs.begin(), iter);
+ // When we reach here, there are no more documents to return to the destination.
+ // We therefore need to notify a other threads that maybe sleeping on the condition
+ // variable that we are done.
+ stdx::lock_guard lk(_mutex);
+ _moreDocsCV.notify_one();
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -803,7 +866,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
{
// All clone data must have been drained before starting to fetch the incremental changes.
stdx::unique_lock<Latch> lk(_mutex);
- invariant(_cloneLocs.empty());
+ invariant(_cloneRecordIdsIter == _cloneLocs.end());
// The "snapshot" for delete and update list must be taken under a single lock. This is to
// ensure that we will preserve the causal order of writes. Always consume the delete
@@ -1002,6 +1065,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
}
}
}
+ _cloneRecordIdsIter = _cloneLocs.begin();
} catch (DBException& exception) {
exception.addContext("Executor error while scanning for documents belonging to chunk");
throw;
@@ -1099,7 +1163,6 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
stdx::lock_guard<Latch> sl(_mutex);
- const std::size_t cloneLocsRemaining = _cloneLocs.size();
int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize +
_untransferredUpsertsCounter * _averageObjectSizeForCloneLocs;
@@ -1121,13 +1184,14 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"moveChunk data transfer progress",
"response"_attr = redact(res),
"memoryUsedBytes"_attr = _memoryUsed,
- "docsRemainingToClone"_attr = cloneLocsRemaining,
+ "docsRemainingToClone"_attr =
+ _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver,
"untransferredModsSizeBytes"_attr = untransferredModsSizeBytes);
}
if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase &&
estimateUntransferredSessionsSize == 0) {
- if (cloneLocsRemaining != 0 ||
+ if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) ||
(_jumboChunkCloneState && _forceJumbo &&
PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) {
return {ErrorCodes::OperationIncomplete,