summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2023-03-02 15:54:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-02 17:26:25 +0000
commit3f72fcd96b8ce1738d76d529baa01a2db0a62ac1 (patch)
treef408922144963ceccfad6c4af128265d89b71463
parent5321501d5dc7814641fc3ba6e5535b038ddbf4dd (diff)
downloadmongo-3f72fcd96b8ce1738d76d529baa01a2db0a62ac1.tar.gz
Revert "SERVER-72619 Refactor and add more testing for migration_chunk_cloner_source changes"
This reverts commit 6659be6a462ee241f6b0a80a27ae9440bebf9216.
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp234
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h218
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp655
3 files changed, 133 insertions, 974 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 1f11e168733..d56601cb85b 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -427,7 +427,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte
}
} else {
invariant(PlanExecutor::IS_EOF == _jumboChunkCloneState->clonerState);
- invariant(_cloneList.hasMore());
+ invariant(_cloneLocs.empty());
}
}
@@ -726,6 +726,61 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon
_jumboChunkCloneState->clonerExec->detachFromOperationContext();
}
+boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc(
+ OperationContext* opCtx, const CollectionPtr& collection) {
+ while (true) {
+ stdx::unique_lock lk(_mutex);
+ invariant(_inProgressReads >= 0);
+ RecordId nextRecordId;
+ Snapshotted<BSONObj> doc;
+
+ _moreDocsCV.wait(lk, [&]() {
+ return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() ||
+ _inProgressReads == 0;
+ });
+
+ // One of the following must now be true (corresponding to the three if conditions):
+ // 1. There is a document in the overflow set
+ // 2. The iterator has not reached the end of the record id set
+ // 3. The overflow set is empty, the iterator is at the end, and
+ // no threads are holding a document. This condition indicates
+ // that there are no more docs to return for the cloning phase.
+ if (!_overflowDocs.empty()) {
+ doc = std::move(_overflowDocs.front());
+ _overflowDocs.pop_front();
+ ++_inProgressReads;
+ return doc;
+ } else if (_cloneRecordIdsIter != _cloneLocs.end()) {
+ nextRecordId = *_cloneRecordIdsIter;
+ ++_cloneRecordIdsIter;
+ ++_inProgressReads;
+ } else {
+ invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size());
+ return boost::none;
+ }
+
+ // In order to saturate the disk, the I/O operation must occur without
+ // holding the mutex.
+ lk.unlock();
+ if (collection->findDoc(opCtx, nextRecordId, &doc))
+ return doc;
+ lk.lock();
+ ++_numRecordsPassedOver;
+
+ // It is possible that this document is no longer in the collection,
+ // in which case, we try again and indicate to other threads that this
+ // thread is not holding a document.
+ --_inProgressReads;
+ _moreDocsCV.notify_one();
+ }
+}
+
+void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) {
+ stdx::lock_guard lk(_mutex);
+ invariant(_inProgressReads >= 1);
+ _overflowDocs.push_back(std::move(doc));
+}
+
void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx,
const CollectionPtr& collection,
BSONArrayBuilder* arrBuilder) {
@@ -733,24 +788,18 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load()));
- while (true) {
- int recordsNoLongerExist = 0;
- auto docInFlight = _cloneList.getNextDoc(opCtx, collection, &recordsNoLongerExist);
-
- if (recordsNoLongerExist) {
+ while (auto doc = _getNextDoc(opCtx, collection)) {
+ ON_BLOCK_EXIT([&]() {
stdx::lock_guard lk(_mutex);
- _numRecordsPassedOver += recordsNoLongerExist;
- }
-
- const auto& doc = docInFlight->getDoc();
- if (!doc) {
- break;
- }
+ invariant(_inProgressReads > 0);
+ --_inProgressReads;
+ _moreDocsCV.notify_one();
+ });
// We must always make progress in this method by at least one document because empty
// return indicates there is no more initial clone data.
if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) {
- _cloneList.insertOverflowDoc(*doc);
+ _insertOverflowDoc(std::move(*doc));
break;
}
@@ -772,7 +821,7 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
// that we take into consideration the overhead of BSONArray indices.
if (arrBuilder->arrSize() &&
(arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) {
- _cloneList.insertOverflowDoc(*doc);
+ _insertOverflowDoc(std::move(*doc));
break;
}
@@ -783,6 +832,12 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
arrBuilder->append(doc->value());
ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
}
+
+ // When we reach here, there are no more documents to return to the destination.
+ // We therefore need to notify a other threads that maybe sleeping on the condition
+ // variable that we are done.
+ stdx::lock_guard lk(_mutex);
+ _moreDocsCV.notify_one();
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -791,7 +846,7 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
return static_cast<uint64_t>(BSONObjMaxUserSize);
return std::min(static_cast<uint64_t>(BSONObjMaxUserSize),
- _averageObjectSizeForCloneLocs * _cloneList.size());
+ _averageObjectSizeForCloneLocs * _cloneLocs.size());
}
Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
@@ -892,7 +947,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
{
// All clone data must have been drained before starting to fetch the incremental changes.
stdx::unique_lock<Latch> lk(_mutex);
- invariant(!_cloneList.hasMore());
+ invariant(_cloneRecordIdsIter == _cloneLocs.end());
// The "snapshot" for delete and update list must be taken under a single lock. This is to
// ensure that we will preserve the causal order of writes. Always consume the delete
@@ -1074,8 +1129,6 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
try {
BSONObj obj;
RecordId recordId;
- RecordIdSet recordIdSet;
-
while (PlanExecutor::ADVANCED == exec->getNext(&obj, &recordId)) {
Status interruptStatus = opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
@@ -1083,20 +1136,20 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
}
if (!isLargeChunk) {
- recordIdSet.insert(recordId);
+ stdx::lock_guard<Latch> lk(_mutex);
+ _cloneLocs.insert(recordId);
}
if (++recCount > maxRecsWhenFull) {
isLargeChunk = true;
if (_forceJumbo) {
- recordIdSet.clear();
+ _cloneLocs.clear();
break;
}
}
}
-
- _cloneList.populateList(std::move(recordIdSet));
+ _cloneRecordIdsIter = _cloneLocs.begin();
} catch (DBException& exception) {
exception.addContext("Executor error while scanning for documents belonging to chunk");
throw;
@@ -1217,13 +1270,13 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"response"_attr = redact(res),
"memoryUsedBytes"_attr = _memoryUsed,
"docsRemainingToClone"_attr =
- _cloneList.size() - _numRecordsCloned - _numRecordsPassedOver,
+ _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver,
"untransferredModsSizeBytes"_attr = untransferredModsSizeBytes);
}
if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase &&
estimateUntransferredSessionsSize == 0) {
- if (_cloneList.hasMore() ||
+ if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) ||
(_jumboChunkCloneState && _forceJumbo &&
PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) {
return {ErrorCodes::OperationIncomplete,
@@ -1368,135 +1421,4 @@ MigrationChunkClonerSourceLegacy::getNotificationForNextSessionMigrationBatch()
return _sessionCatalogSource->getNotificationForNewOplog();
}
-MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::DocumentInFlightWithLock(
- WithLock lock, MigrationChunkClonerSourceLegacy::CloneList& clonerList)
- : _inProgressReadToken(
- std::make_unique<MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken>(
- lock, clonerList)) {}
-
-void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::setDoc(
- boost::optional<Snapshotted<BSONObj>> doc) {
- _doc = std::move(doc);
-}
-
-std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>
-MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::release() {
- invariant(_inProgressReadToken);
-
- return std::make_unique<
- MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>(
- std::move(_inProgressReadToken), std::move(_doc));
-}
-
-MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::
- DocumentInFlightWhileNotInLock(
- std::unique_ptr<CloneList::InProgressReadToken> inProgressReadToken,
- boost::optional<Snapshotted<BSONObj>> doc)
- : _inProgressReadToken(std::move(inProgressReadToken)), _doc(std::move(doc)) {}
-
-void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::setDoc(
- boost::optional<Snapshotted<BSONObj>> doc) {
- _doc = std::move(doc);
-}
-
-const boost::optional<Snapshotted<BSONObj>>&
-MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::getDoc() {
- return _doc;
-}
-
-MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::InProgressReadToken(
- WithLock withLock, CloneList& cloneList)
- : _cloneList(cloneList) {
- _cloneList._startedOneInProgressRead(withLock);
-}
-
-MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::~InProgressReadToken() {
- _cloneList._finishedOneInProgressRead();
-}
-
-MigrationChunkClonerSourceLegacy::CloneList::CloneList() {
- _recordIdsIter = _recordIds.begin();
-}
-
-void MigrationChunkClonerSourceLegacy::CloneList::populateList(RecordIdSet recordIds) {
- stdx::lock_guard lk(_mutex);
- _recordIds = std::move(recordIds);
- _recordIdsIter = _recordIds.begin();
-}
-
-void MigrationChunkClonerSourceLegacy::CloneList::insertOverflowDoc(Snapshotted<BSONObj> doc) {
- stdx::lock_guard lk(_mutex);
- invariant(_inProgressReads >= 1);
- _overflowDocs.push_back(std::move(doc));
-}
-
-bool MigrationChunkClonerSourceLegacy::CloneList::hasMore() const {
- stdx::lock_guard lk(_mutex);
- return _recordIdsIter != _recordIds.cend() && _inProgressReads > 0;
-}
-
-std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>
-MigrationChunkClonerSourceLegacy::CloneList::getNextDoc(OperationContext* opCtx,
- const CollectionPtr& collection,
- int* numRecordsNoLongerExist) {
- while (true) {
- stdx::unique_lock lk(_mutex);
- invariant(_inProgressReads >= 0);
- RecordId nextRecordId;
-
- opCtx->waitForConditionOrInterrupt(_moreDocsCV, lk, [&]() {
- return _recordIdsIter != _recordIds.end() || !_overflowDocs.empty() ||
- _inProgressReads == 0;
- });
-
- DocumentInFlightWithLock docInFlight(lk, *this);
-
- // One of the following must now be true (corresponding to the three if conditions):
- // 1. There is a document in the overflow set
- // 2. The iterator has not reached the end of the record id set
- // 3. The overflow set is empty, the iterator is at the end, and
- // no threads are holding a document. This condition indicates
- // that there are no more docs to return for the cloning phase.
- if (!_overflowDocs.empty()) {
- docInFlight.setDoc(std::move(_overflowDocs.front()));
- _overflowDocs.pop_front();
- return docInFlight.release();
- } else if (_recordIdsIter != _recordIds.end()) {
- nextRecordId = *_recordIdsIter;
- ++_recordIdsIter;
- } else {
- return docInFlight.release();
- }
-
- lk.unlock();
-
- auto docInFlightWhileNotLocked = docInFlight.release();
-
- Snapshotted<BSONObj> doc;
- if (collection->findDoc(opCtx, nextRecordId, &doc)) {
- docInFlightWhileNotLocked->setDoc(std::move(doc));
- return docInFlightWhileNotLocked;
- }
-
- if (numRecordsNoLongerExist) {
- (*numRecordsNoLongerExist)++;
- }
- }
-}
-
-size_t MigrationChunkClonerSourceLegacy::CloneList::size() const {
- stdx::unique_lock lk(_mutex);
- return _recordIds.size();
-}
-
-void MigrationChunkClonerSourceLegacy::CloneList::_startedOneInProgressRead(WithLock) {
- _inProgressReads++;
-}
-
-void MigrationChunkClonerSourceLegacy::CloneList::_finishedOneInProgressRead() {
- stdx::lock_guard lk(_mutex);
- _inProgressReads--;
- _moreDocsCV.notify_one();
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 4fff7da8d17..7a59f6029bc 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -228,168 +228,6 @@ private:
friend class LogOpForShardingHandler;
friend class LogTransactionOperationsForShardingHandler;
- using RecordIdSet = std::set<RecordId>;
-
- /**
- * This is responsible for all the logic revolving around handling documents that needs to be
- * cloned.
- *
- * This class is multithread-safe.
- */
- class CloneList {
- public:
- /**
- * Simple container that increments the given counter when this is constructed and
- * decrements it when it is destroyed. User of this class is responsible for holding
- * necessary mutexes when counter is being modified.
- */
- class InProgressReadToken {
- public:
- InProgressReadToken(WithLock, CloneList& cloneList);
- InProgressReadToken(const InProgressReadToken&) = delete;
- InProgressReadToken(InProgressReadToken&&) = default;
-
- ~InProgressReadToken();
-
- private:
- CloneList& _cloneList;
- };
-
- /**
- * Container for a document that can be added to the nextCloneBatch call. As long as
- * instances of this object exist, it will prevent getNextDoc from prematurely returning
- * an empty response (which means there are no more docs left to clone).
- *
- * This assumes that _mutex is not being held when it is destroyed.
- */
- class DocumentInFlightWhileNotInLock {
- public:
- DocumentInFlightWhileNotInLock(std::unique_ptr<InProgressReadToken> inProgressReadToken,
- boost::optional<Snapshotted<BSONObj>> doc);
- DocumentInFlightWhileNotInLock(const DocumentInFlightWhileNotInLock&) = delete;
- DocumentInFlightWhileNotInLock(DocumentInFlightWhileNotInLock&&) = default;
-
- void setDoc(boost::optional<Snapshotted<BSONObj>> doc);
- const boost::optional<Snapshotted<BSONObj>>& getDoc();
-
- private:
- std::unique_ptr<InProgressReadToken> _inProgressReadToken;
- boost::optional<Snapshotted<BSONObj>> _doc;
- };
-
- /**
- * A variant of the DocumentInFlightWhileNotInLock where the _mutex should be held while it
- * has a document contained within it.
- */
- class DocumentInFlightWithLock {
- public:
- DocumentInFlightWithLock(WithLock, CloneList& clonerList);
- DocumentInFlightWithLock(const DocumentInFlightWithLock&) = delete;
- DocumentInFlightWithLock(DocumentInFlightWithLock&&) = default;
-
- void setDoc(boost::optional<Snapshotted<BSONObj>> doc);
-
- /**
- * Releases the contained document. Can only be called once for the entire lifetime
- * of this object.
- */
- std::unique_ptr<DocumentInFlightWhileNotInLock> release();
-
- private:
- std::unique_ptr<InProgressReadToken> _inProgressReadToken;
- boost::optional<Snapshotted<BSONObj>> _doc;
- };
-
- CloneList();
-
- /**
- * Overwrites the list of record ids to clone.
- */
- void populateList(RecordIdSet recordIds);
-
- /**
- * Returns a document to clone. If there are no more documents left to clone,
- * DocumentInFlightWhileNotInLock::getDoc will return boost::none.
- *
- * numRecordsNoLonger exists is an optional parameter that can be used to track
- * the number of recordIds encountered that refers to a document that no longer
- * exists.
- */
- std::unique_ptr<DocumentInFlightWhileNotInLock> getNextDoc(OperationContext* opCtx,
- const CollectionPtr& collection,
- int* numRecordsNoLongerExist);
-
- /**
- * Put back a document previously obtained from this CloneList instance to the overflow
- * pool.
- */
- void insertOverflowDoc(Snapshotted<BSONObj> doc);
-
- /**
- * Returns true if there are more documents to clone.
- */
- bool hasMore() const;
-
- /**
- * Returns the size of the populated record ids.
- */
- size_t size() const;
-
- private:
- /**
- * Increments the counter for inProgressReads.
- */
- void _startedOneInProgressRead(WithLock);
-
- /**
- * Decrements the counter for inProgressReads.
- */
- void _finishedOneInProgressRead();
-
- mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSource::CloneList::_mutex");
-
- RecordIdSet _recordIds;
-
- // This iterator is a pointer into the _recordIds set. It allows concurrent access to
- // the _recordIds set by allowing threads servicing _migrateClone requests to do the
- // following:
- // 1. Acquire mutex "_mutex" above.
- // 2. Copy *_recordIdsIter into its local stack frame.
- // 3. Increment _recordIdsIter
- // 4. Unlock "_mutex."
- // 5. Do the I/O to fetch the document corresponding to this record Id.
- //
- // The purpose of this algorithm, is to allow different threads to concurrently start I/O
- // jobs in order to more fully saturate the disk.
- //
- // One issue with this algorithm, is that only 16MB worth of documents can be returned in
- // response to a _migrateClone request. But, the thread does not know the size of a
- // document until it does the I/O. At which point, if the document does not fit in the
- // response to _migrateClone request the document must be made available to a different
- // thread servicing a _migrateClone request. To solve this problem, the thread adds the
- // document to the below _overflowDocs deque.
- RecordIdSet::iterator _recordIdsIter;
-
- // This deque stores all documents that must be sent to the destination, but could not fit
- // in the response to a particular _migrateClone request.
- std::deque<Snapshotted<BSONObj>> _overflowDocs;
-
- // This integer represents how many documents are being "held" by threads servicing
- // _migrateClone requests. Any document that is "held" by a thread may be added to the
- // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request.
- // This integer is necessary because it gives us a condition on when all documents to be
- // sent to the destination have been exhausted.
- //
- // If (_recordIdsIter == _recordIds.end() && _overflowDocs.empty() &&
- // _inProgressReads == 0) then all documents have been returned to the destination.
- RecordIdSet::size_type _inProgressReads = 0;
-
- // This condition variable allows us to wait on the following condition:
- // Either we're done and the above condition is satisfied, or there is some document to
- // return.
- stdx::condition_variable _moreDocsCV;
- };
-
// Represents the states in which the cloner can be
enum State { kNew, kCloning, kDone };
@@ -427,6 +265,15 @@ private:
Status _storeCurrentLocs(OperationContext* opCtx);
/**
+ * Returns boost::none if there are no more documents to get.
+ * Increments _inProgressReads if and only if return value is not none.
+ */
+ boost::optional<Snapshotted<BSONObj>> _getNextDoc(OperationContext* opCtx,
+ const CollectionPtr& collection);
+
+ void _insertOverflowDoc(Snapshotted<BSONObj> doc);
+
+ /**
* Adds the OpTime to the list of OpTimes for oplog entries that we should consider migrating as
* part of session migration.
*/
@@ -531,10 +378,49 @@ private:
// The current state of the cloner
State _state{kNew};
- CloneList _cloneList;
-
- RecordIdSet::size_type _numRecordsCloned{0};
- RecordIdSet::size_type _numRecordsPassedOver{0};
+ // List of record ids that needs to be transferred (initial clone)
+ std::set<RecordId> _cloneLocs;
+
+ // This iterator is a pointer into the _cloneLocs set. It allows concurrent access to
+ // the _cloneLocs set by allowing threads servicing _migrateClone requests to do the
+ // following:
+ // 1. Acquire mutex "_mutex" above.
+ // 2. Copy *_cloneRecordIdsIter into its local stack frame.
+ // 3. Increment _cloneRecordIdsIter
+ // 4. Unlock "_mutex."
+ // 5. Do the I/O to fetch the document corresponding to this record Id.
+ //
+ // The purpose of this algorithm, is to allow different threads to concurrently start I/O jobs
+ // in order to more fully saturate the disk.
+ //
+ // One issue with this algorithm, is that only 16MB worth of documents can be returned in
+ // response to a _migrateClone request. But, the thread does not know the size of a document
+ // until it does the I/O. At which point, if the document does not fit in the response to
+ // _migrateClone request the document must be made available to a different thread servicing a
+ // _migrateClone request. To solve this problem, the thread adds the document
+ // to the below _overflowDocs deque.
+ std::set<RecordId>::iterator _cloneRecordIdsIter;
+
+ // This deque stores all documents that must be sent to the destination, but could not fit
+ // in the response to a particular _migrateClone request.
+ std::deque<Snapshotted<BSONObj>> _overflowDocs;
+
+ // This integer represents how many documents are being "held" by threads servicing
+ // _migrateClone requests. Any document that is "held" by a thread may be added to the
+ // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request.
+ // This integer is necessary because it gives us a condition on when all documents to be sent
+ // to the destination have been exhausted.
+ //
+ // If (_cloneRecordIdsIter == _cloneLocs.end() && _overflowDocs.empty() && _inProgressReads
+ // == 0) then all documents have been returned to the destination.
+ decltype(_cloneLocs.size()) _inProgressReads = 0;
+
+ // This condition variable allows us to wait on the following condition:
+ // Either we're done and the above condition is satisfied, or there is some document to
+ // return.
+ stdx::condition_variable _moreDocsCV;
+ decltype(_cloneLocs.size()) _numRecordsCloned{0};
+ decltype(_cloneLocs.size()) _numRecordsPassedOver{0};
// The estimated average object size during the clone phase. Used for buffer size
// pre-allocation (initial clone).
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index c062c8e2fe3..91e1b4a21bc 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -66,476 +66,6 @@ const ConnectionString kRecipientConnStr =
HostAndPort("RecipientHost2:1234"),
HostAndPort("RecipientHost3:1234")});
-class CollectionWithFault : public Collection {
-public:
- CollectionWithFault(const Collection* originalCollection) : _coll(originalCollection) {}
-
- void setFindDocStatus(Status newStatus) {
- _findDocStatus = newStatus;
- }
-
- //////////////////////////////////////////////////////////////////////////////////
- // Collection overrides
-
- std::shared_ptr<Collection> clone() const override {
- return _coll->clone();
- }
-
- SharedCollectionDecorations* getSharedDecorations() const override {
- return _coll->getSharedDecorations();
- }
-
- const NamespaceString& ns() const override {
- return _coll->ns();
- }
-
- Status rename(OperationContext* opCtx, const NamespaceString& nss, bool stayTemp) override {
- MONGO_UNREACHABLE;
- }
-
- RecordId getCatalogId() const override {
- return _coll->getCatalogId();
- }
-
- UUID uuid() const override {
- return _coll->uuid();
- }
-
- const IndexCatalog* getIndexCatalog() const override {
- return _coll->getIndexCatalog();
- }
-
- IndexCatalog* getIndexCatalog() override {
- MONGO_UNREACHABLE;
- }
-
- RecordStore* getRecordStore() const {
- return _coll->getRecordStore();
- }
-
- std::shared_ptr<Ident> getSharedIdent() const override {
- return _coll->getSharedIdent();
- }
-
- const BSONObj getValidatorDoc() const override {
- return _coll->getValidatorDoc();
- }
-
- std::pair<SchemaValidationResult, Status> checkValidation(
- OperationContext* opCtx, const BSONObj& document) const override {
- return _coll->checkValidation(opCtx, document);
- }
-
- bool requiresIdIndex() const override {
- return _coll->requiresIdIndex();
- }
-
- Snapshotted<BSONObj> docFor(OperationContext* opCtx, RecordId loc) const override {
- return _coll->docFor(opCtx, loc);
- }
-
- bool findDoc(OperationContext* opCtx, RecordId loc, Snapshotted<BSONObj>* out) const override {
- uassertStatusOK(_findDocStatus);
- return _coll->findDoc(opCtx, loc, out);
- }
-
- std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx,
- bool forward = true) const override {
- return _coll->getCursor(opCtx, forward);
- }
-
- bool updateWithDamagesSupported() const override {
- return _coll->updateWithDamagesSupported();
- }
-
- Status truncate(OperationContext* opCtx) override {
- MONGO_UNREACHABLE;
- }
-
- Validator parseValidator(OperationContext* opCtx,
- const BSONObj& validator,
- MatchExpressionParser::AllowedFeatureSet allowedFeatures,
- boost::optional<multiversion::FeatureCompatibilityVersion>
- maxFeatureCompatibilityVersion) const override {
- return _coll->parseValidator(
- opCtx, validator, allowedFeatures, maxFeatureCompatibilityVersion);
- }
-
- void setValidator(OperationContext* opCtx, Validator validator) override {
- MONGO_UNREACHABLE;
- }
-
- Status setValidationLevel(OperationContext* opCtx, ValidationLevelEnum newLevel) override {
- MONGO_UNREACHABLE;
- }
-
- Status setValidationAction(OperationContext* opCtx, ValidationActionEnum newAction) override {
- MONGO_UNREACHABLE;
- }
-
- boost::optional<ValidationLevelEnum> getValidationLevel() const override {
- return _coll->getValidationLevel();
- }
-
- boost::optional<ValidationActionEnum> getValidationAction() const override {
- return _coll->getValidationAction();
- }
-
- Status updateValidator(OperationContext* opCtx,
- BSONObj newValidator,
- boost::optional<ValidationLevelEnum> newLevel,
- boost::optional<ValidationActionEnum> newAction) override {
- MONGO_UNREACHABLE;
- }
-
- Status checkValidatorAPIVersionCompatability(OperationContext* opCtx) const override {
- return _coll->checkValidatorAPIVersionCompatability(opCtx);
- }
-
- bool isChangeStreamPreAndPostImagesEnabled() const override {
- return _coll->isChangeStreamPreAndPostImagesEnabled();
- }
-
- void setChangeStreamPreAndPostImages(OperationContext* opCtx,
- ChangeStreamPreAndPostImagesOptions val) override {
- MONGO_UNREACHABLE;
- }
-
- bool isTemporary() const override {
- return _coll->isTemporary();
- }
-
- boost::optional<bool> getTimeseriesBucketsMayHaveMixedSchemaData() const override {
- return _coll->getTimeseriesBucketsMayHaveMixedSchemaData();
- }
-
- void setTimeseriesBucketsMayHaveMixedSchemaData(OperationContext* opCtx,
- boost::optional<bool> setting) override {
- MONGO_UNREACHABLE;
- }
-
- bool doesTimeseriesBucketsDocContainMixedSchemaData(const BSONObj& bucketsDoc) const override {
- return _coll->doesTimeseriesBucketsDocContainMixedSchemaData(bucketsDoc);
- }
-
- bool getRequiresTimeseriesExtendedRangeSupport() const override {
- return _coll->getRequiresTimeseriesExtendedRangeSupport();
- }
-
- void setRequiresTimeseriesExtendedRangeSupport(OperationContext* opCtx) const override {
- return _coll->setRequiresTimeseriesExtendedRangeSupport(opCtx);
- }
-
- bool isClustered() const override {
- return _coll->isClustered();
- }
-
- boost::optional<ClusteredCollectionInfo> getClusteredInfo() const override {
- return _coll->getClusteredInfo();
- }
-
- void updateClusteredIndexTTLSetting(OperationContext* opCtx,
- boost::optional<int64_t> expireAfterSeconds) override {
- MONGO_UNREACHABLE;
- }
-
- Status updateCappedSize(OperationContext* opCtx,
- boost::optional<long long> newCappedSize,
- boost::optional<long long> newCappedMax) override {
- MONGO_UNREACHABLE;
- }
-
- StatusWith<int> checkMetaDataForIndex(const std::string& indexName,
- const BSONObj& spec) const override {
- return _coll->checkMetaDataForIndex(indexName, spec);
- }
-
- void updateTTLSetting(OperationContext* opCtx,
- StringData idxName,
- long long newExpireSeconds) override {
- MONGO_UNREACHABLE;
- }
-
- void updateHiddenSetting(OperationContext* opCtx, StringData idxName, bool hidden) override {
- MONGO_UNREACHABLE;
- }
-
- void updateUniqueSetting(OperationContext* opCtx, StringData idxName, bool unique) override {
- MONGO_UNREACHABLE;
- }
-
- void updatePrepareUniqueSetting(OperationContext* opCtx,
- StringData idxName,
- bool prepareUnique) override {
- MONGO_UNREACHABLE;
- }
-
- void setIsTemp(OperationContext* opCtx, bool isTemp) override {
- MONGO_UNREACHABLE;
- }
-
- void removeIndex(OperationContext* opCtx, StringData indexName) override {
- MONGO_UNREACHABLE;
- }
-
- Status prepareForIndexBuild(OperationContext* opCtx,
- const IndexDescriptor* spec,
- boost::optional<UUID> buildUUID,
- bool isBackgroundSecondaryBuild) override {
- MONGO_UNREACHABLE;
- }
-
- boost::optional<UUID> getIndexBuildUUID(StringData indexName) const override {
- return _coll->getIndexBuildUUID(indexName);
- }
-
- bool isIndexMultikey(OperationContext* opCtx,
- StringData indexName,
- MultikeyPaths* multikeyPaths,
- int indexOffset = -1) const override {
- return _coll->isIndexMultikey(opCtx, indexName, multikeyPaths, indexOffset);
- }
-
- bool setIndexIsMultikey(OperationContext* opCtx,
- StringData indexName,
- const MultikeyPaths& multikeyPaths,
- int indexOffset = -1) const override {
- return _coll->setIndexIsMultikey(opCtx, indexName, multikeyPaths, indexOffset);
- }
-
- void forceSetIndexIsMultikey(OperationContext* opCtx,
- const IndexDescriptor* desc,
- bool isMultikey,
- const MultikeyPaths& multikeyPaths) const override {
- return _coll->forceSetIndexIsMultikey(opCtx, desc, isMultikey, multikeyPaths);
- }
-
- int getTotalIndexCount() const override {
- return _coll->getTotalIndexCount();
- }
-
- int getCompletedIndexCount() const override {
- return _coll->getCompletedIndexCount();
- }
-
- BSONObj getIndexSpec(StringData indexName) const override {
- return _coll->getIndexSpec(indexName);
- }
-
- void getAllIndexes(std::vector<std::string>* names) const override {
- return _coll->getAllIndexes(names);
- }
-
- void getReadyIndexes(std::vector<std::string>* names) const override {
- return _coll->getReadyIndexes(names);
- }
-
- bool isIndexPresent(StringData indexName) const override {
- return _coll->isIndexPresent(indexName);
- }
-
- bool isIndexReady(StringData indexName) const override {
- return _coll->isIndexReady(indexName);
- }
-
- void replaceMetadata(OperationContext* opCtx,
- std::shared_ptr<BSONCollectionCatalogEntry::MetaData> md) override {
- MONGO_UNREACHABLE;
- }
-
- bool isCapped() const override {
- return _coll->isCapped();
- }
-
- long long getCappedMaxDocs() const override {
- return _coll->getCappedMaxDocs();
- }
-
- long long getCappedMaxSize() const override {
- return _coll->getCappedMaxSize();
- }
-
- long long numRecords(OperationContext* opCtx) const override {
- return _coll->numRecords(opCtx);
- }
-
- long long dataSize(OperationContext* opCtx) const override {
- return _coll->dataSize(opCtx);
- }
-
- bool isEmpty(OperationContext* opCtx) const override {
- return _coll->isEmpty(opCtx);
- }
-
- int averageObjectSize(OperationContext* opCtx) const override {
- return _coll->averageObjectSize(opCtx);
- }
-
- uint64_t getIndexSize(OperationContext* opCtx,
- BSONObjBuilder* details = nullptr,
- int scale = 1) const {
- return _coll->getIndexSize(opCtx, details, scale);
- }
-
- uint64_t getIndexFreeStorageBytes(OperationContext* opCtx) const override {
- return _coll->getIndexFreeStorageBytes(opCtx);
- }
-
- boost::optional<Timestamp> getMinimumVisibleSnapshot() const override {
- return _coll->getMinimumVisibleSnapshot();
- }
-
- void setMinimumVisibleSnapshot(Timestamp name) override {
- MONGO_UNREACHABLE;
- }
-
- boost::optional<TimeseriesOptions> getTimeseriesOptions() const override {
- return _coll->getTimeseriesOptions();
- }
-
- void setTimeseriesOptions(OperationContext* opCtx,
- const TimeseriesOptions& tsOptions) override {
- MONGO_UNREACHABLE;
- }
-
- const CollatorInterface* getDefaultCollator() const override {
- return _coll->getDefaultCollator();
- }
-
- const CollectionOptions& getCollectionOptions() const override {
- return _coll->getCollectionOptions();
- }
-
- StatusWith<std::vector<BSONObj>> addCollationDefaultsToIndexSpecsForCreate(
- OperationContext* opCtx, const std::vector<BSONObj>& indexSpecs) const {
- return _coll->addCollationDefaultsToIndexSpecsForCreate(opCtx, indexSpecs);
- }
-
- void indexBuildSuccess(OperationContext* opCtx, IndexCatalogEntry* index) override {
- MONGO_UNREACHABLE;
- }
-
- void onDeregisterFromCatalog(OperationContext* opCtx) override {
- MONGO_UNREACHABLE;
- }
-
- void deleteDocument(OperationContext* opCtx,
- StmtId stmtId,
- RecordId loc,
- OpDebug* opDebug,
- bool fromMigrate = false,
- bool noWarn = false,
- StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off,
- CheckRecordId checkRecordId = CheckRecordId::Off) const override {
- MONGO_UNREACHABLE;
- }
-
- void deleteDocument(OperationContext* opCtx,
- Snapshotted<BSONObj> doc,
- StmtId stmtId,
- RecordId loc,
- OpDebug* opDebug,
- bool fromMigrate = false,
- bool noWarn = false,
- StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off,
- CheckRecordId checkRecordId = CheckRecordId::Off) const override {
- MONGO_UNREACHABLE;
- }
-
- Status insertDocuments(OperationContext* opCtx,
- std::vector<InsertStatement>::const_iterator begin,
- std::vector<InsertStatement>::const_iterator end,
- OpDebug* opDebug,
- bool fromMigrate = false) const override {
- MONGO_UNREACHABLE;
- }
-
- Status insertDocument(OperationContext* opCtx,
- const InsertStatement& doc,
- OpDebug* opDebug,
- bool fromMigrate = false) const override {
- MONGO_UNREACHABLE;
- }
-
- Status insertDocumentsForOplog(OperationContext* opCtx,
- std::vector<Record>* records,
- const std::vector<Timestamp>& timestamps) const override {
- MONGO_UNREACHABLE;
- }
-
- Status insertDocumentForBulkLoader(OperationContext* opCtx,
- const BSONObj& doc,
- const OnRecordInsertedFn& onRecordInserted) const override {
- MONGO_UNREACHABLE;
- }
-
- RecordId updateDocument(OperationContext* opCtx,
- RecordId oldLocation,
- const Snapshotted<BSONObj>& oldDoc,
- const BSONObj& newDoc,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const override {
- MONGO_UNREACHABLE;
- }
-
- StatusWith<RecordData> updateDocumentWithDamages(OperationContext* opCtx,
- RecordId loc,
- const Snapshotted<RecordData>& oldRec,
- const char* damageSource,
- const mutablebson::DamageVector& damages,
- CollectionUpdateArgs* args) const {
- MONGO_UNREACHABLE;
- }
-
- void cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive) const override {
- MONGO_UNREACHABLE;
- }
-
- bool getRecordPreImages() const override {
- return _coll->getRecordPreImages();
- }
-
- void setRecordPreImages(OperationContext* opCtx, bool val) override {
- MONGO_UNREACHABLE;
- }
-
- std::vector<std::string> removeInvalidIndexOptions(OperationContext* opCtx) override {
- MONGO_UNREACHABLE;
- }
-
- CappedCallback* getCappedCallback() override {
- MONGO_UNREACHABLE;
- }
-
- const CappedCallback* getCappedCallback() const override {
- return _coll->getCappedCallback();
- }
-
- std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier() const override {
- return _coll->getCappedInsertNotifier();
- }
-
- std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makePlanExecutor(
- OperationContext* opCtx,
- const CollectionPtr& yieldableCollection,
- PlanYieldPolicy::YieldPolicy yieldPolicy,
- ScanDirection scanDirection,
- boost::optional<RecordId> resumeAfterRecordId = boost::none) const override {
- return _coll->makePlanExecutor(
- opCtx, yieldableCollection, yieldPolicy, scanDirection, resumeAfterRecordId);
- }
-
- void establishOplogCollectionForLogging(OperationContext* opCtx) const override {
- return _coll->establishOplogCollectionForLogging(opCtx);
- }
-
-private:
- const Collection* _coll;
-
- Status _findDocStatus{Status::OK()};
-};
-
class MigrationChunkClonerSourceLegacyTest : public ShardServerTestFixture {
protected:
MigrationChunkClonerSourceLegacyTest() : ShardServerTestFixture(Options{}.useMockClock(true)) {}
@@ -597,29 +127,9 @@ protected:
if (docs.empty())
return;
- std::deque<BSONObj> docsToInsert;
- std::copy(docs.cbegin(), docs.cend(), std::back_inserter(docsToInsert));
-
- while (!docsToInsert.empty()) {
- std::vector<BSONObj> batchToInsert;
-
- size_t sizeInBatch = 0;
- while (!docsToInsert.empty()) {
- auto next = docsToInsert.front();
- sizeInBatch += next.objsize();
-
- if (sizeInBatch > BSONObjMaxUserSize) {
- break;
- }
-
- batchToInsert.push_back(next);
- docsToInsert.pop_front();
- }
-
- auto response = client()->insertAcknowledged(kNss.ns(), batchToInsert);
- ASSERT_OK(getStatusFromWriteCommandReply(response));
- ASSERT_GT(response["n"].Int(), 0);
- }
+ auto response = client()->insertAcknowledged(kNss.ns(), docs);
+ ASSERT_OK(getStatusFromWriteCommandReply(response));
+ ASSERT_GT(response["n"].Int(), 0);
}
void deleteDocsInShardedCollection(BSONObj query) {
@@ -1191,164 +701,5 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) {
cloner.cancelClone(operationContext());
}
-TEST_F(MigrationChunkClonerSourceLegacyTest, CloneFetchThatOverflows) {
- const auto kBigSize = 10 * 1024 * 1024;
- const std::vector<BSONObj> contents = {createSizedCollectionDocument(100, kBigSize),
- createSizedCollectionDocument(120, kBigSize),
- createSizedCollectionDocument(199, kBigSize)};
-
- createShardedCollection(contents);
-
- ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
- req.setMaxChunkSizeBytes(64 * 1024 * 1024);
-
- MigrationChunkClonerSourceLegacy cloner(req,
- WriteConcernOptions(),
- kShardKeyPattern,
- kDonorConnStr,
- kRecipientConnStr.getServers()[0]);
-
- {
- auto futureStartClone = launchAsync([&]() {
- onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
- });
-
- ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber));
- futureStartClone.default_timed_get();
- }
-
- // Ensure the initial clone documents are available
- {
- AutoGetCollection autoColl(operationContext(), kNss, MODE_IS);
-
- {
- BSONArrayBuilder arrBuilder;
- ASSERT_OK(
- cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
- ASSERT_EQ(1, arrBuilder.arrSize());
-
- const auto arr = arrBuilder.arr();
- ASSERT_BSONOBJ_EQ(contents[0], arr[0].Obj());
- }
-
- {
- BSONArrayBuilder arrBuilder;
- ASSERT_OK(
- cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
- ASSERT_EQ(1, arrBuilder.arrSize());
-
- const auto arr = arrBuilder.arr();
- ASSERT_BSONOBJ_EQ(contents[1], arr[0].Obj());
- }
-
- {
- BSONArrayBuilder arrBuilder;
- ASSERT_OK(
- cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
- ASSERT_EQ(1, arrBuilder.arrSize());
-
- const auto arr = arrBuilder.arr();
- ASSERT_BSONOBJ_EQ(contents[2], arr[0].Obj());
- }
-
- {
- BSONArrayBuilder arrBuilder;
- ASSERT_OK(
- cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
- ASSERT_EQ(0, arrBuilder.arrSize());
- }
- }
-
- auto futureCommit = launchAsync([&]() {
- onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
- });
-
- ASSERT_OK(cloner.commitClone(operationContext(), true /* acquireCSOnRecipient */));
- futureCommit.default_timed_get();
-}
-
-TEST_F(MigrationChunkClonerSourceLegacyTest, CloneShouldNotCrashWhenNextCloneBatchThrows) {
- const std::vector<BSONObj> contents = {createCollectionDocument(100),
- createCollectionDocument(150),
- createCollectionDocument(199)};
-
- createShardedCollection(contents);
-
- const ShardsvrMoveRange req =
- createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
- MigrationChunkClonerSourceLegacy cloner(req,
- WriteConcernOptions(),
- kShardKeyPattern,
- kDonorConnStr,
- kRecipientConnStr.getServers()[0]);
-
- {
- auto futureStartClone = launchAsync([&]() {
- onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
- });
-
- ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber));
- futureStartClone.default_timed_get();
- }
-
- {
- AutoGetCollection autoColl(operationContext(), kNss, MODE_IS);
-
- {
- auto collWithFault =
- std::make_unique<CollectionWithFault>(autoColl.getCollection().get());
- CollectionPtr collPtrWithFault(collWithFault.get(), CollectionPtr::NoYieldTag());
-
- // Note: findDoc currently doesn't have any interruption points, this test simulates
- // an exception being thrown while it is being called.
- collWithFault->setFindDocStatus({ErrorCodes::Interrupted, "fake interrupt"});
-
- BSONArrayBuilder arrBuilder;
-
- ASSERT_THROWS_CODE(
- cloner.nextCloneBatch(operationContext(), collPtrWithFault, &arrBuilder),
- DBException,
- ErrorCodes::Interrupted);
- ASSERT_EQ(0, arrBuilder.arrSize());
- }
-
- // The first document was lost and returned an error during nextCloneBatch. This would
- // cause the migration destination to abort, but it is still possible for other
- // threads to be in the middle of calling nextCloneBatch and the next nextCloneBatch
- // calls simulate calls from other threads after the first call threw.
- {
- BSONArrayBuilder arrBuilder;
- ASSERT_OK(
- cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
-
- const auto arr = arrBuilder.arr();
- ASSERT_EQ(2, arrBuilder.arrSize());
-
- ASSERT_BSONOBJ_EQ(contents[1], arr[0].Obj());
- ASSERT_BSONOBJ_EQ(contents[2], arr[1].Obj());
- }
-
- {
- BSONArrayBuilder arrBuilder;
- ASSERT_OK(
- cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
-
- const auto arr = arrBuilder.arr();
- ASSERT_EQ(0, arrBuilder.arrSize());
- }
- }
-
- auto futureCommit = launchAsync([&]() {
- // Simulate destination returning an error.
- onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << false); });
-
- // This is the return response for recvChunkAbort.
- onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
- });
-
- ASSERT_NOT_OK(cloner.commitClone(operationContext(), true /* acquireCSOnRecipient */));
- futureCommit.default_timed_get();
-}
-
} // namespace
} // namespace mongo