summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2023-01-25 15:22:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-25 16:00:33 +0000
commit6659be6a462ee241f6b0a80a27ae9440bebf9216 (patch)
tree2d8910329816f71876ecc52729f5a1f972da0c98
parent40e94d0e8284279553b5ad24dc6fdbb03e5eee8a (diff)
downloadmongo-6659be6a462ee241f6b0a80a27ae9440bebf9216.tar.gz
SERVER-72619 Refactor and add more testing for migration_chunk_cloner_source changes
(cherry picked from commit 9ad83e70708064f08742cc21ea87e6f9c90a73b1)
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp234
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h218
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp655
3 files changed, 974 insertions, 133 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index a5079a4041d..c324a8affe5 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -413,7 +413,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte
}
} else {
invariant(PlanExecutor::IS_EOF == _jumboChunkCloneState->clonerState);
- invariant(_cloneLocs.empty());
+ invariant(_cloneList.hasMore());
}
}
@@ -712,61 +712,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon
_jumboChunkCloneState->clonerExec->detachFromOperationContext();
}
-boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc(
- OperationContext* opCtx, const CollectionPtr& collection) {
- while (true) {
- stdx::unique_lock lk(_mutex);
- invariant(_inProgressReads >= 0);
- RecordId nextRecordId;
- Snapshotted<BSONObj> doc;
-
- _moreDocsCV.wait(lk, [&]() {
- return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() ||
- _inProgressReads == 0;
- });
-
- // One of the following must now be true (corresponding to the three if conditions):
- // 1. There is a document in the overflow set
- // 2. The iterator has not reached the end of the record id set
- // 3. The overflow set is empty, the iterator is at the end, and
- // no threads are holding a document. This condition indicates
- // that there are no more docs to return for the cloning phase.
- if (!_overflowDocs.empty()) {
- doc = std::move(_overflowDocs.front());
- _overflowDocs.pop_front();
- ++_inProgressReads;
- return doc;
- } else if (_cloneRecordIdsIter != _cloneLocs.end()) {
- nextRecordId = *_cloneRecordIdsIter;
- ++_cloneRecordIdsIter;
- ++_inProgressReads;
- } else {
- invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size());
- return boost::none;
- }
-
- // In order to saturate the disk, the I/O operation must occur without
- // holding the mutex.
- lk.unlock();
- if (collection->findDoc(opCtx, nextRecordId, &doc))
- return doc;
- lk.lock();
- ++_numRecordsPassedOver;
-
- // It is possible that this document is no longer in the collection,
- // in which case, we try again and indicate to other threads that this
- // thread is not holding a document.
- --_inProgressReads;
- _moreDocsCV.notify_one();
- }
-}
-
-void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) {
- stdx::lock_guard lk(_mutex);
- invariant(_inProgressReads >= 1);
- _overflowDocs.push_back(std::move(doc));
-}
-
void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx,
const CollectionPtr& collection,
BSONArrayBuilder* arrBuilder) {
@@ -774,18 +719,24 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load()));
- while (auto doc = _getNextDoc(opCtx, collection)) {
- ON_BLOCK_EXIT([&]() {
+ while (true) {
+ int recordsNoLongerExist = 0;
+ auto docInFlight = _cloneList.getNextDoc(opCtx, collection, &recordsNoLongerExist);
+
+ if (recordsNoLongerExist) {
stdx::lock_guard lk(_mutex);
- invariant(_inProgressReads > 0);
- --_inProgressReads;
- _moreDocsCV.notify_one();
- });
+ _numRecordsPassedOver += recordsNoLongerExist;
+ }
+
+ const auto& doc = docInFlight->getDoc();
+ if (!doc) {
+ break;
+ }
// We must always make progress in this method by at least one document because empty
// return indicates there is no more initial clone data.
if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) {
- _insertOverflowDoc(std::move(*doc));
+ _cloneList.insertOverflowDoc(*doc);
break;
}
@@ -807,7 +758,7 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
// that we take into consideration the overhead of BSONArray indices.
if (arrBuilder->arrSize() &&
(arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) {
- _insertOverflowDoc(std::move(*doc));
+ _cloneList.insertOverflowDoc(*doc);
break;
}
@@ -818,12 +769,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
arrBuilder->append(doc->value());
ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
}
-
- // When we reach here, there are no more documents to return to the destination.
- // We therefore need to notify a other threads that maybe sleeping on the condition
- // variable that we are done.
- stdx::lock_guard lk(_mutex);
- _moreDocsCV.notify_one();
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -832,7 +777,7 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
return static_cast<uint64_t>(BSONObjMaxUserSize);
return std::min(static_cast<uint64_t>(BSONObjMaxUserSize),
- _averageObjectSizeForCloneLocs * _cloneLocs.size());
+ _averageObjectSizeForCloneLocs * _cloneList.size());
}
Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
@@ -866,7 +811,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
{
// All clone data must have been drained before starting to fetch the incremental changes.
stdx::unique_lock<Latch> lk(_mutex);
- invariant(_cloneRecordIdsIter == _cloneLocs.end());
+ invariant(!_cloneList.hasMore());
// The "snapshot" for delete and update list must be taken under a single lock. This is to
// ensure that we will preserve the causal order of writes. Always consume the delete
@@ -1045,6 +990,8 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
try {
BSONObj obj;
RecordId recordId;
+ RecordIdSet recordIdSet;
+
while (PlanExecutor::ADVANCED == exec->getNext(&obj, &recordId)) {
Status interruptStatus = opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
@@ -1052,20 +999,20 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
}
if (!isLargeChunk) {
- stdx::lock_guard<Latch> lk(_mutex);
- _cloneLocs.insert(recordId);
+ recordIdSet.insert(recordId);
}
if (++recCount > maxRecsWhenFull) {
isLargeChunk = true;
if (_forceJumbo) {
- _cloneLocs.clear();
+ recordIdSet.clear();
break;
}
}
}
- _cloneRecordIdsIter = _cloneLocs.begin();
+
+ _cloneList.populateList(std::move(recordIdSet));
} catch (DBException& exception) {
exception.addContext("Executor error while scanning for documents belonging to chunk");
throw;
@@ -1185,13 +1132,13 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"response"_attr = redact(res),
"memoryUsedBytes"_attr = _memoryUsed,
"docsRemainingToClone"_attr =
- _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver,
+ _cloneList.size() - _numRecordsCloned - _numRecordsPassedOver,
"untransferredModsSizeBytes"_attr = untransferredModsSizeBytes);
}
if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase &&
estimateUntransferredSessionsSize == 0) {
- if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) ||
+ if (_cloneList.hasMore() ||
(_jumboChunkCloneState && _forceJumbo &&
PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) {
return {ErrorCodes::OperationIncomplete,
@@ -1335,4 +1282,135 @@ MigrationChunkClonerSourceLegacy::getNotificationForNextSessionMigrationBatch()
return _sessionCatalogSource->getNotificationForNewOplog();
}
+MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::DocumentInFlightWithLock(
+ WithLock lock, MigrationChunkClonerSourceLegacy::CloneList& clonerList)
+ : _inProgressReadToken(
+ std::make_unique<MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken>(
+ lock, clonerList)) {}
+
+void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::setDoc(
+ boost::optional<Snapshotted<BSONObj>> doc) {
+ _doc = std::move(doc);
+}
+
+std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>
+MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::release() {
+ invariant(_inProgressReadToken);
+
+ return std::make_unique<
+ MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>(
+ std::move(_inProgressReadToken), std::move(_doc));
+}
+
+MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::
+ DocumentInFlightWhileNotInLock(
+ std::unique_ptr<CloneList::InProgressReadToken> inProgressReadToken,
+ boost::optional<Snapshotted<BSONObj>> doc)
+ : _inProgressReadToken(std::move(inProgressReadToken)), _doc(std::move(doc)) {}
+
+void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::setDoc(
+ boost::optional<Snapshotted<BSONObj>> doc) {
+ _doc = std::move(doc);
+}
+
+const boost::optional<Snapshotted<BSONObj>>&
+MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::getDoc() {
+ return _doc;
+}
+
+MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::InProgressReadToken(
+ WithLock withLock, CloneList& cloneList)
+ : _cloneList(cloneList) {
+ _cloneList._startedOneInProgressRead(withLock);
+}
+
+MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::~InProgressReadToken() {
+ _cloneList._finishedOneInProgressRead();
+}
+
+MigrationChunkClonerSourceLegacy::CloneList::CloneList() {
+ _recordIdsIter = _recordIds.begin();
+}
+
+void MigrationChunkClonerSourceLegacy::CloneList::populateList(RecordIdSet recordIds) {
+ stdx::lock_guard lk(_mutex);
+ _recordIds = std::move(recordIds);
+ _recordIdsIter = _recordIds.begin();
+}
+
+void MigrationChunkClonerSourceLegacy::CloneList::insertOverflowDoc(Snapshotted<BSONObj> doc) {
+ stdx::lock_guard lk(_mutex);
+ invariant(_inProgressReads >= 1);
+ _overflowDocs.push_back(std::move(doc));
+}
+
+bool MigrationChunkClonerSourceLegacy::CloneList::hasMore() const {
+ stdx::lock_guard lk(_mutex);
+ return _recordIdsIter != _recordIds.cend() && _inProgressReads > 0;
+}
+
+std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>
+MigrationChunkClonerSourceLegacy::CloneList::getNextDoc(OperationContext* opCtx,
+ const CollectionPtr& collection,
+ int* numRecordsNoLongerExist) {
+ while (true) {
+ stdx::unique_lock lk(_mutex);
+ invariant(_inProgressReads >= 0);
+ RecordId nextRecordId;
+
+ opCtx->waitForConditionOrInterrupt(_moreDocsCV, lk, [&]() {
+ return _recordIdsIter != _recordIds.end() || !_overflowDocs.empty() ||
+ _inProgressReads == 0;
+ });
+
+ DocumentInFlightWithLock docInFlight(lk, *this);
+
+ // One of the following must now be true (corresponding to the three if conditions):
+ // 1. There is a document in the overflow set
+ // 2. The iterator has not reached the end of the record id set
+ // 3. The overflow set is empty, the iterator is at the end, and
+ // no threads are holding a document. This condition indicates
+ // that there are no more docs to return for the cloning phase.
+ if (!_overflowDocs.empty()) {
+ docInFlight.setDoc(std::move(_overflowDocs.front()));
+ _overflowDocs.pop_front();
+ return docInFlight.release();
+ } else if (_recordIdsIter != _recordIds.end()) {
+ nextRecordId = *_recordIdsIter;
+ ++_recordIdsIter;
+ } else {
+ return docInFlight.release();
+ }
+
+ lk.unlock();
+
+ auto docInFlightWhileNotLocked = docInFlight.release();
+
+ Snapshotted<BSONObj> doc;
+ if (collection->findDoc(opCtx, nextRecordId, &doc)) {
+ docInFlightWhileNotLocked->setDoc(std::move(doc));
+ return docInFlightWhileNotLocked;
+ }
+
+ if (numRecordsNoLongerExist) {
+ (*numRecordsNoLongerExist)++;
+ }
+ }
+}
+
+size_t MigrationChunkClonerSourceLegacy::CloneList::size() const {
+ stdx::unique_lock lk(_mutex);
+ return _recordIds.size();
+}
+
+void MigrationChunkClonerSourceLegacy::CloneList::_startedOneInProgressRead(WithLock) {
+ _inProgressReads++;
+}
+
+void MigrationChunkClonerSourceLegacy::CloneList::_finishedOneInProgressRead() {
+ stdx::lock_guard lk(_mutex);
+ _inProgressReads--;
+ _moreDocsCV.notify_one();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 081ffc799ba..803c1f512af 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -226,6 +226,168 @@ private:
friend class LogOpForShardingHandler;
friend class LogTransactionOperationsForShardingHandler;
+ using RecordIdSet = std::set<RecordId>;
+
+ /**
+ * This is responsible for all the logic revolving around handling documents that needs to be
+ * cloned.
+ *
+ * This class is multithread-safe.
+ */
+ class CloneList {
+ public:
+ /**
+ * Simple container that increments the given counter when this is constructed and
+ * decrements it when it is destroyed. User of this class is responsible for holding
+ * necessary mutexes when counter is being modified.
+ */
+ class InProgressReadToken {
+ public:
+ InProgressReadToken(WithLock, CloneList& cloneList);
+ InProgressReadToken(const InProgressReadToken&) = delete;
+ InProgressReadToken(InProgressReadToken&&) = default;
+
+ ~InProgressReadToken();
+
+ private:
+ CloneList& _cloneList;
+ };
+
+ /**
+ * Container for a document that can be added to the nextCloneBatch call. As long as
+ * instances of this object exist, it will prevent getNextDoc from prematurely returning
+ * an empty response (which means there are no more docs left to clone).
+ *
+ * This assumes that _mutex is not being held when it is destroyed.
+ */
+ class DocumentInFlightWhileNotInLock {
+ public:
+ DocumentInFlightWhileNotInLock(std::unique_ptr<InProgressReadToken> inProgressReadToken,
+ boost::optional<Snapshotted<BSONObj>> doc);
+ DocumentInFlightWhileNotInLock(const DocumentInFlightWhileNotInLock&) = delete;
+ DocumentInFlightWhileNotInLock(DocumentInFlightWhileNotInLock&&) = default;
+
+ void setDoc(boost::optional<Snapshotted<BSONObj>> doc);
+ const boost::optional<Snapshotted<BSONObj>>& getDoc();
+
+ private:
+ std::unique_ptr<InProgressReadToken> _inProgressReadToken;
+ boost::optional<Snapshotted<BSONObj>> _doc;
+ };
+
+ /**
+ * A variant of the DocumentInFlightWhileNotInLock where the _mutex should be held while it
+ * has a document contained within it.
+ */
+ class DocumentInFlightWithLock {
+ public:
+ DocumentInFlightWithLock(WithLock, CloneList& clonerList);
+ DocumentInFlightWithLock(const DocumentInFlightWithLock&) = delete;
+ DocumentInFlightWithLock(DocumentInFlightWithLock&&) = default;
+
+ void setDoc(boost::optional<Snapshotted<BSONObj>> doc);
+
+ /**
+ * Releases the contained document. Can only be called once for the entire lifetime
+ * of this object.
+ */
+ std::unique_ptr<DocumentInFlightWhileNotInLock> release();
+
+ private:
+ std::unique_ptr<InProgressReadToken> _inProgressReadToken;
+ boost::optional<Snapshotted<BSONObj>> _doc;
+ };
+
+ CloneList();
+
+ /**
+ * Overwrites the list of record ids to clone.
+ */
+ void populateList(RecordIdSet recordIds);
+
+ /**
+ * Returns a document to clone. If there are no more documents left to clone,
+ * DocumentInFlightWhileNotInLock::getDoc will return boost::none.
+ *
+ * numRecordsNoLonger exists is an optional parameter that can be used to track
+ * the number of recordIds encountered that refers to a document that no longer
+ * exists.
+ */
+ std::unique_ptr<DocumentInFlightWhileNotInLock> getNextDoc(OperationContext* opCtx,
+ const CollectionPtr& collection,
+ int* numRecordsNoLongerExist);
+
+ /**
+ * Put back a document previously obtained from this CloneList instance to the overflow
+ * pool.
+ */
+ void insertOverflowDoc(Snapshotted<BSONObj> doc);
+
+ /**
+ * Returns true if there are more documents to clone.
+ */
+ bool hasMore() const;
+
+ /**
+ * Returns the size of the populated record ids.
+ */
+ size_t size() const;
+
+ private:
+ /**
+ * Increments the counter for inProgressReads.
+ */
+ void _startedOneInProgressRead(WithLock);
+
+ /**
+ * Decrements the counter for inProgressReads.
+ */
+ void _finishedOneInProgressRead();
+
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSource::CloneList::_mutex");
+
+ RecordIdSet _recordIds;
+
+ // This iterator is a pointer into the _recordIds set. It allows concurrent access to
+ // the _recordIds set by allowing threads servicing _migrateClone requests to do the
+ // following:
+ // 1. Acquire mutex "_mutex" above.
+ // 2. Copy *_recordIdsIter into its local stack frame.
+ // 3. Increment _recordIdsIter
+ // 4. Unlock "_mutex."
+ // 5. Do the I/O to fetch the document corresponding to this record Id.
+ //
+ // The purpose of this algorithm, is to allow different threads to concurrently start I/O
+ // jobs in order to more fully saturate the disk.
+ //
+ // One issue with this algorithm, is that only 16MB worth of documents can be returned in
+ // response to a _migrateClone request. But, the thread does not know the size of a
+ // document until it does the I/O. At which point, if the document does not fit in the
+ // response to _migrateClone request the document must be made available to a different
+ // thread servicing a _migrateClone request. To solve this problem, the thread adds the
+ // document to the below _overflowDocs deque.
+ RecordIdSet::iterator _recordIdsIter;
+
+ // This deque stores all documents that must be sent to the destination, but could not fit
+ // in the response to a particular _migrateClone request.
+ std::deque<Snapshotted<BSONObj>> _overflowDocs;
+
+ // This integer represents how many documents are being "held" by threads servicing
+ // _migrateClone requests. Any document that is "held" by a thread may be added to the
+ // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request.
+ // This integer is necessary because it gives us a condition on when all documents to be
+ // sent to the destination have been exhausted.
+ //
+ // If (_recordIdsIter == _recordIds.end() && _overflowDocs.empty() &&
+ // _inProgressReads == 0) then all documents have been returned to the destination.
+ RecordIdSet::size_type _inProgressReads = 0;
+
+ // This condition variable allows us to wait on the following condition:
+ // Either we're done and the above condition is satisfied, or there is some document to
+ // return.
+ stdx::condition_variable _moreDocsCV;
+ };
+
// Represents the states in which the cloner can be
enum State { kNew, kCloning, kDone };
@@ -263,15 +425,6 @@ private:
Status _storeCurrentLocs(OperationContext* opCtx);
/**
- * Returns boost::none if there are no more documents to get.
- * Increments _inProgressReads if and only if return value is not none.
- */
- boost::optional<Snapshotted<BSONObj>> _getNextDoc(OperationContext* opCtx,
- const CollectionPtr& collection);
-
- void _insertOverflowDoc(Snapshotted<BSONObj> doc);
-
- /**
* Adds the OpTime to the list of OpTimes for oplog entries that we should consider migrating as
* part of session migration.
*/
@@ -359,49 +512,10 @@ private:
// The current state of the cloner
State _state{kNew};
- // List of record ids that needs to be transferred (initial clone)
- std::set<RecordId> _cloneLocs;
-
- // This iterator is a pointer into the _cloneLocs set. It allows concurrent access to
- // the _cloneLocs set by allowing threads servicing _migrateClone requests to do the
- // following:
- // 1. Acquire mutex "_mutex" above.
- // 2. Copy *_cloneRecordIdsIter into its local stack frame.
- // 3. Increment _cloneRecordIdsIter
- // 4. Unlock "_mutex."
- // 5. Do the I/O to fetch the document corresponding to this record Id.
- //
- // The purpose of this algorithm, is to allow different threads to concurrently start I/O jobs
- // in order to more fully saturate the disk.
- //
- // One issue with this algorithm, is that only 16MB worth of documents can be returned in
- // response to a _migrateClone request. But, the thread does not know the size of a document
- // until it does the I/O. At which point, if the document does not fit in the response to
- // _migrateClone request the document must be made available to a different thread servicing a
- // _migrateClone request. To solve this problem, the thread adds the document
- // to the below _overflowDocs deque.
- std::set<RecordId>::iterator _cloneRecordIdsIter;
-
- // This deque stores all documents that must be sent to the destination, but could not fit
- // in the response to a particular _migrateClone request.
- std::deque<Snapshotted<BSONObj>> _overflowDocs;
-
- // This integer represents how many documents are being "held" by threads servicing
- // _migrateClone requests. Any document that is "held" by a thread may be added to the
- // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request.
- // This integer is necessary because it gives us a condition on when all documents to be sent
- // to the destination have been exhausted.
- //
- // If (_cloneRecordIdsIter == _cloneLocs.end() && _overflowDocs.empty() && _inProgressReads
- // == 0) then all documents have been returned to the destination.
- decltype(_cloneLocs.size()) _inProgressReads = 0;
-
- // This condition variable allows us to wait on the following condition:
- // Either we're done and the above condition is satisfied, or there is some document to
- // return.
- stdx::condition_variable _moreDocsCV;
- decltype(_cloneLocs.size()) _numRecordsCloned{0};
- decltype(_cloneLocs.size()) _numRecordsPassedOver{0};
+ CloneList _cloneList;
+
+ RecordIdSet::size_type _numRecordsCloned{0};
+ RecordIdSet::size_type _numRecordsPassedOver{0};
// The estimated average object size during the clone phase. Used for buffer size
// pre-allocation (initial clone).
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index 91e1b4a21bc..c062c8e2fe3 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -66,6 +66,476 @@ const ConnectionString kRecipientConnStr =
HostAndPort("RecipientHost2:1234"),
HostAndPort("RecipientHost3:1234")});
+class CollectionWithFault : public Collection {
+public:
+ CollectionWithFault(const Collection* originalCollection) : _coll(originalCollection) {}
+
+ void setFindDocStatus(Status newStatus) {
+ _findDocStatus = newStatus;
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////
+ // Collection overrides
+
+ std::shared_ptr<Collection> clone() const override {
+ return _coll->clone();
+ }
+
+ SharedCollectionDecorations* getSharedDecorations() const override {
+ return _coll->getSharedDecorations();
+ }
+
+ const NamespaceString& ns() const override {
+ return _coll->ns();
+ }
+
+ Status rename(OperationContext* opCtx, const NamespaceString& nss, bool stayTemp) override {
+ MONGO_UNREACHABLE;
+ }
+
+ RecordId getCatalogId() const override {
+ return _coll->getCatalogId();
+ }
+
+ UUID uuid() const override {
+ return _coll->uuid();
+ }
+
+ const IndexCatalog* getIndexCatalog() const override {
+ return _coll->getIndexCatalog();
+ }
+
+ IndexCatalog* getIndexCatalog() override {
+ MONGO_UNREACHABLE;
+ }
+
+ RecordStore* getRecordStore() const {
+ return _coll->getRecordStore();
+ }
+
+ std::shared_ptr<Ident> getSharedIdent() const override {
+ return _coll->getSharedIdent();
+ }
+
+ const BSONObj getValidatorDoc() const override {
+ return _coll->getValidatorDoc();
+ }
+
+ std::pair<SchemaValidationResult, Status> checkValidation(
+ OperationContext* opCtx, const BSONObj& document) const override {
+ return _coll->checkValidation(opCtx, document);
+ }
+
+ bool requiresIdIndex() const override {
+ return _coll->requiresIdIndex();
+ }
+
+ Snapshotted<BSONObj> docFor(OperationContext* opCtx, RecordId loc) const override {
+ return _coll->docFor(opCtx, loc);
+ }
+
+ bool findDoc(OperationContext* opCtx, RecordId loc, Snapshotted<BSONObj>* out) const override {
+ uassertStatusOK(_findDocStatus);
+ return _coll->findDoc(opCtx, loc, out);
+ }
+
+ std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx,
+ bool forward = true) const override {
+ return _coll->getCursor(opCtx, forward);
+ }
+
+ bool updateWithDamagesSupported() const override {
+ return _coll->updateWithDamagesSupported();
+ }
+
+ Status truncate(OperationContext* opCtx) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Validator parseValidator(OperationContext* opCtx,
+ const BSONObj& validator,
+ MatchExpressionParser::AllowedFeatureSet allowedFeatures,
+ boost::optional<multiversion::FeatureCompatibilityVersion>
+ maxFeatureCompatibilityVersion) const override {
+ return _coll->parseValidator(
+ opCtx, validator, allowedFeatures, maxFeatureCompatibilityVersion);
+ }
+
+ void setValidator(OperationContext* opCtx, Validator validator) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status setValidationLevel(OperationContext* opCtx, ValidationLevelEnum newLevel) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status setValidationAction(OperationContext* opCtx, ValidationActionEnum newAction) override {
+ MONGO_UNREACHABLE;
+ }
+
+ boost::optional<ValidationLevelEnum> getValidationLevel() const override {
+ return _coll->getValidationLevel();
+ }
+
+ boost::optional<ValidationActionEnum> getValidationAction() const override {
+ return _coll->getValidationAction();
+ }
+
+ Status updateValidator(OperationContext* opCtx,
+ BSONObj newValidator,
+ boost::optional<ValidationLevelEnum> newLevel,
+ boost::optional<ValidationActionEnum> newAction) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status checkValidatorAPIVersionCompatability(OperationContext* opCtx) const override {
+ return _coll->checkValidatorAPIVersionCompatability(opCtx);
+ }
+
+ bool isChangeStreamPreAndPostImagesEnabled() const override {
+ return _coll->isChangeStreamPreAndPostImagesEnabled();
+ }
+
+ void setChangeStreamPreAndPostImages(OperationContext* opCtx,
+ ChangeStreamPreAndPostImagesOptions val) override {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isTemporary() const override {
+ return _coll->isTemporary();
+ }
+
+ boost::optional<bool> getTimeseriesBucketsMayHaveMixedSchemaData() const override {
+ return _coll->getTimeseriesBucketsMayHaveMixedSchemaData();
+ }
+
+ void setTimeseriesBucketsMayHaveMixedSchemaData(OperationContext* opCtx,
+ boost::optional<bool> setting) override {
+ MONGO_UNREACHABLE;
+ }
+
+ bool doesTimeseriesBucketsDocContainMixedSchemaData(const BSONObj& bucketsDoc) const override {
+ return _coll->doesTimeseriesBucketsDocContainMixedSchemaData(bucketsDoc);
+ }
+
+ bool getRequiresTimeseriesExtendedRangeSupport() const override {
+ return _coll->getRequiresTimeseriesExtendedRangeSupport();
+ }
+
+ void setRequiresTimeseriesExtendedRangeSupport(OperationContext* opCtx) const override {
+ return _coll->setRequiresTimeseriesExtendedRangeSupport(opCtx);
+ }
+
+ bool isClustered() const override {
+ return _coll->isClustered();
+ }
+
+ boost::optional<ClusteredCollectionInfo> getClusteredInfo() const override {
+ return _coll->getClusteredInfo();
+ }
+
+ void updateClusteredIndexTTLSetting(OperationContext* opCtx,
+ boost::optional<int64_t> expireAfterSeconds) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status updateCappedSize(OperationContext* opCtx,
+ boost::optional<long long> newCappedSize,
+ boost::optional<long long> newCappedMax) override {
+ MONGO_UNREACHABLE;
+ }
+
+ StatusWith<int> checkMetaDataForIndex(const std::string& indexName,
+ const BSONObj& spec) const override {
+ return _coll->checkMetaDataForIndex(indexName, spec);
+ }
+
+ void updateTTLSetting(OperationContext* opCtx,
+ StringData idxName,
+ long long newExpireSeconds) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void updateHiddenSetting(OperationContext* opCtx, StringData idxName, bool hidden) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void updateUniqueSetting(OperationContext* opCtx, StringData idxName, bool unique) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void updatePrepareUniqueSetting(OperationContext* opCtx,
+ StringData idxName,
+ bool prepareUnique) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void setIsTemp(OperationContext* opCtx, bool isTemp) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void removeIndex(OperationContext* opCtx, StringData indexName) override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status prepareForIndexBuild(OperationContext* opCtx,
+ const IndexDescriptor* spec,
+ boost::optional<UUID> buildUUID,
+ bool isBackgroundSecondaryBuild) override {
+ MONGO_UNREACHABLE;
+ }
+
+ boost::optional<UUID> getIndexBuildUUID(StringData indexName) const override {
+ return _coll->getIndexBuildUUID(indexName);
+ }
+
+ bool isIndexMultikey(OperationContext* opCtx,
+ StringData indexName,
+ MultikeyPaths* multikeyPaths,
+ int indexOffset = -1) const override {
+ return _coll->isIndexMultikey(opCtx, indexName, multikeyPaths, indexOffset);
+ }
+
+ bool setIndexIsMultikey(OperationContext* opCtx,
+ StringData indexName,
+ const MultikeyPaths& multikeyPaths,
+ int indexOffset = -1) const override {
+ return _coll->setIndexIsMultikey(opCtx, indexName, multikeyPaths, indexOffset);
+ }
+
+ void forceSetIndexIsMultikey(OperationContext* opCtx,
+ const IndexDescriptor* desc,
+ bool isMultikey,
+ const MultikeyPaths& multikeyPaths) const override {
+ return _coll->forceSetIndexIsMultikey(opCtx, desc, isMultikey, multikeyPaths);
+ }
+
+ int getTotalIndexCount() const override {
+ return _coll->getTotalIndexCount();
+ }
+
+ int getCompletedIndexCount() const override {
+ return _coll->getCompletedIndexCount();
+ }
+
+ BSONObj getIndexSpec(StringData indexName) const override {
+ return _coll->getIndexSpec(indexName);
+ }
+
+ void getAllIndexes(std::vector<std::string>* names) const override {
+ return _coll->getAllIndexes(names);
+ }
+
+ void getReadyIndexes(std::vector<std::string>* names) const override {
+ return _coll->getReadyIndexes(names);
+ }
+
+ bool isIndexPresent(StringData indexName) const override {
+ return _coll->isIndexPresent(indexName);
+ }
+
+ bool isIndexReady(StringData indexName) const override {
+ return _coll->isIndexReady(indexName);
+ }
+
+ void replaceMetadata(OperationContext* opCtx,
+ std::shared_ptr<BSONCollectionCatalogEntry::MetaData> md) override {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isCapped() const override {
+ return _coll->isCapped();
+ }
+
+ long long getCappedMaxDocs() const override {
+ return _coll->getCappedMaxDocs();
+ }
+
+ long long getCappedMaxSize() const override {
+ return _coll->getCappedMaxSize();
+ }
+
+ long long numRecords(OperationContext* opCtx) const override {
+ return _coll->numRecords(opCtx);
+ }
+
+ long long dataSize(OperationContext* opCtx) const override {
+ return _coll->dataSize(opCtx);
+ }
+
+ bool isEmpty(OperationContext* opCtx) const override {
+ return _coll->isEmpty(opCtx);
+ }
+
+ int averageObjectSize(OperationContext* opCtx) const override {
+ return _coll->averageObjectSize(opCtx);
+ }
+
+ uint64_t getIndexSize(OperationContext* opCtx,
+ BSONObjBuilder* details = nullptr,
+ int scale = 1) const {
+ return _coll->getIndexSize(opCtx, details, scale);
+ }
+
+ uint64_t getIndexFreeStorageBytes(OperationContext* opCtx) const override {
+ return _coll->getIndexFreeStorageBytes(opCtx);
+ }
+
+ boost::optional<Timestamp> getMinimumVisibleSnapshot() const override {
+ return _coll->getMinimumVisibleSnapshot();
+ }
+
+ void setMinimumVisibleSnapshot(Timestamp name) override {
+ MONGO_UNREACHABLE;
+ }
+
+ boost::optional<TimeseriesOptions> getTimeseriesOptions() const override {
+ return _coll->getTimeseriesOptions();
+ }
+
+ void setTimeseriesOptions(OperationContext* opCtx,
+ const TimeseriesOptions& tsOptions) override {
+ MONGO_UNREACHABLE;
+ }
+
+ const CollatorInterface* getDefaultCollator() const override {
+ return _coll->getDefaultCollator();
+ }
+
+ const CollectionOptions& getCollectionOptions() const override {
+ return _coll->getCollectionOptions();
+ }
+
+ StatusWith<std::vector<BSONObj>> addCollationDefaultsToIndexSpecsForCreate(
+ OperationContext* opCtx, const std::vector<BSONObj>& indexSpecs) const {
+ return _coll->addCollationDefaultsToIndexSpecsForCreate(opCtx, indexSpecs);
+ }
+
+ void indexBuildSuccess(OperationContext* opCtx, IndexCatalogEntry* index) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void onDeregisterFromCatalog(OperationContext* opCtx) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void deleteDocument(OperationContext* opCtx,
+ StmtId stmtId,
+ RecordId loc,
+ OpDebug* opDebug,
+ bool fromMigrate = false,
+ bool noWarn = false,
+ StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off,
+ CheckRecordId checkRecordId = CheckRecordId::Off) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ void deleteDocument(OperationContext* opCtx,
+ Snapshotted<BSONObj> doc,
+ StmtId stmtId,
+ RecordId loc,
+ OpDebug* opDebug,
+ bool fromMigrate = false,
+ bool noWarn = false,
+ StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off,
+ CheckRecordId checkRecordId = CheckRecordId::Off) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status insertDocuments(OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ OpDebug* opDebug,
+ bool fromMigrate = false) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status insertDocument(OperationContext* opCtx,
+ const InsertStatement& doc,
+ OpDebug* opDebug,
+ bool fromMigrate = false) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status insertDocumentsForOplog(OperationContext* opCtx,
+ std::vector<Record>* records,
+ const std::vector<Timestamp>& timestamps) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ Status insertDocumentForBulkLoader(OperationContext* opCtx,
+ const BSONObj& doc,
+ const OnRecordInsertedFn& onRecordInserted) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ RecordId updateDocument(OperationContext* opCtx,
+ RecordId oldLocation,
+ const Snapshotted<BSONObj>& oldDoc,
+ const BSONObj& newDoc,
+ bool indexesAffected,
+ OpDebug* opDebug,
+ CollectionUpdateArgs* args) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ StatusWith<RecordData> updateDocumentWithDamages(OperationContext* opCtx,
+ RecordId loc,
+ const Snapshotted<RecordData>& oldRec,
+ const char* damageSource,
+ const mutablebson::DamageVector& damages,
+ CollectionUpdateArgs* args) const {
+ MONGO_UNREACHABLE;
+ }
+
+ void cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ bool getRecordPreImages() const override {
+ return _coll->getRecordPreImages();
+ }
+
+ void setRecordPreImages(OperationContext* opCtx, bool val) override {
+ MONGO_UNREACHABLE;
+ }
+
+ std::vector<std::string> removeInvalidIndexOptions(OperationContext* opCtx) override {
+ MONGO_UNREACHABLE;
+ }
+
+ CappedCallback* getCappedCallback() override {
+ MONGO_UNREACHABLE;
+ }
+
+ const CappedCallback* getCappedCallback() const override {
+ return _coll->getCappedCallback();
+ }
+
+ std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier() const override {
+ return _coll->getCappedInsertNotifier();
+ }
+
+ std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makePlanExecutor(
+ OperationContext* opCtx,
+ const CollectionPtr& yieldableCollection,
+ PlanYieldPolicy::YieldPolicy yieldPolicy,
+ ScanDirection scanDirection,
+ boost::optional<RecordId> resumeAfterRecordId = boost::none) const override {
+ return _coll->makePlanExecutor(
+ opCtx, yieldableCollection, yieldPolicy, scanDirection, resumeAfterRecordId);
+ }
+
+ void establishOplogCollectionForLogging(OperationContext* opCtx) const override {
+ return _coll->establishOplogCollectionForLogging(opCtx);
+ }
+
+private:
+ const Collection* _coll;
+
+ Status _findDocStatus{Status::OK()};
+};
+
class MigrationChunkClonerSourceLegacyTest : public ShardServerTestFixture {
protected:
MigrationChunkClonerSourceLegacyTest() : ShardServerTestFixture(Options{}.useMockClock(true)) {}
@@ -127,9 +597,29 @@ protected:
if (docs.empty())
return;
- auto response = client()->insertAcknowledged(kNss.ns(), docs);
- ASSERT_OK(getStatusFromWriteCommandReply(response));
- ASSERT_GT(response["n"].Int(), 0);
+ std::deque<BSONObj> docsToInsert;
+ std::copy(docs.cbegin(), docs.cend(), std::back_inserter(docsToInsert));
+
+ while (!docsToInsert.empty()) {
+ std::vector<BSONObj> batchToInsert;
+
+ size_t sizeInBatch = 0;
+ while (!docsToInsert.empty()) {
+ auto next = docsToInsert.front();
+ sizeInBatch += next.objsize();
+
+ if (sizeInBatch > BSONObjMaxUserSize) {
+ break;
+ }
+
+ batchToInsert.push_back(next);
+ docsToInsert.pop_front();
+ }
+
+ auto response = client()->insertAcknowledged(kNss.ns(), batchToInsert);
+ ASSERT_OK(getStatusFromWriteCommandReply(response));
+ ASSERT_GT(response["n"].Int(), 0);
+ }
}
void deleteDocsInShardedCollection(BSONObj query) {
@@ -701,5 +1191,164 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) {
cloner.cancelClone(operationContext());
}
+TEST_F(MigrationChunkClonerSourceLegacyTest, CloneFetchThatOverflows) {
+ const auto kBigSize = 10 * 1024 * 1024;
+ const std::vector<BSONObj> contents = {createSizedCollectionDocument(100, kBigSize),
+ createSizedCollectionDocument(120, kBigSize),
+ createSizedCollectionDocument(199, kBigSize)};
+
+ createShardedCollection(contents);
+
+ ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
+ req.setMaxChunkSizeBytes(64 * 1024 * 1024);
+
+ MigrationChunkClonerSourceLegacy cloner(req,
+ WriteConcernOptions(),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
+
+ {
+ auto futureStartClone = launchAsync([&]() {
+ onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
+ });
+
+ ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber));
+ futureStartClone.default_timed_get();
+ }
+
+ // Ensure the initial clone documents are available
+ {
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IS);
+
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+ ASSERT_EQ(1, arrBuilder.arrSize());
+
+ const auto arr = arrBuilder.arr();
+ ASSERT_BSONOBJ_EQ(contents[0], arr[0].Obj());
+ }
+
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+ ASSERT_EQ(1, arrBuilder.arrSize());
+
+ const auto arr = arrBuilder.arr();
+ ASSERT_BSONOBJ_EQ(contents[1], arr[0].Obj());
+ }
+
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+ ASSERT_EQ(1, arrBuilder.arrSize());
+
+ const auto arr = arrBuilder.arr();
+ ASSERT_BSONOBJ_EQ(contents[2], arr[0].Obj());
+ }
+
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+ ASSERT_EQ(0, arrBuilder.arrSize());
+ }
+ }
+
+ auto futureCommit = launchAsync([&]() {
+ onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
+ });
+
+ ASSERT_OK(cloner.commitClone(operationContext(), true /* acquireCSOnRecipient */));
+ futureCommit.default_timed_get();
+}
+
+TEST_F(MigrationChunkClonerSourceLegacyTest, CloneShouldNotCrashWhenNextCloneBatchThrows) {
+ const std::vector<BSONObj> contents = {createCollectionDocument(100),
+ createCollectionDocument(150),
+ createCollectionDocument(199)};
+
+ createShardedCollection(contents);
+
+ const ShardsvrMoveRange req =
+ createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
+ MigrationChunkClonerSourceLegacy cloner(req,
+ WriteConcernOptions(),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
+
+ {
+ auto futureStartClone = launchAsync([&]() {
+ onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
+ });
+
+ ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber));
+ futureStartClone.default_timed_get();
+ }
+
+ {
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IS);
+
+ {
+ auto collWithFault =
+ std::make_unique<CollectionWithFault>(autoColl.getCollection().get());
+ CollectionPtr collPtrWithFault(collWithFault.get(), CollectionPtr::NoYieldTag());
+
+ // Note: findDoc currently doesn't have any interruption points, this test simulates
+ // an exception being thrown while it is being called.
+ collWithFault->setFindDocStatus({ErrorCodes::Interrupted, "fake interrupt"});
+
+ BSONArrayBuilder arrBuilder;
+
+ ASSERT_THROWS_CODE(
+ cloner.nextCloneBatch(operationContext(), collPtrWithFault, &arrBuilder),
+ DBException,
+ ErrorCodes::Interrupted);
+ ASSERT_EQ(0, arrBuilder.arrSize());
+ }
+
+ // The first document was lost and returned an error during nextCloneBatch. This would
+ // cause the migration destination to abort, but it is still possible for other
+ // threads to be in the middle of calling nextCloneBatch and the next nextCloneBatch
+ // calls simulate calls from other threads after the first call threw.
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+
+ const auto arr = arrBuilder.arr();
+ ASSERT_EQ(2, arrBuilder.arrSize());
+
+ ASSERT_BSONOBJ_EQ(contents[1], arr[0].Obj());
+ ASSERT_BSONOBJ_EQ(contents[2], arr[1].Obj());
+ }
+
+ {
+ BSONArrayBuilder arrBuilder;
+ ASSERT_OK(
+ cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder));
+
+ const auto arr = arrBuilder.arr();
+ ASSERT_EQ(0, arrBuilder.arrSize());
+ }
+ }
+
+ auto futureCommit = launchAsync([&]() {
+ // Simulate destination returning an error.
+ onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << false); });
+
+ // This is the return response for recvChunkAbort.
+ onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); });
+ });
+
+ ASSERT_NOT_OK(cloner.commitClone(operationContext(), true /* acquireCSOnRecipient */));
+ futureCommit.default_timed_get();
+}
+
} // namespace
} // namespace mongo