diff options
author | Randolph Tan <randolph@10gen.com> | 2023-01-25 15:22:36 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-25 16:00:33 +0000 |
commit | 6659be6a462ee241f6b0a80a27ae9440bebf9216 (patch) | |
tree | 2d8910329816f71876ecc52729f5a1f972da0c98 | |
parent | 40e94d0e8284279553b5ad24dc6fdbb03e5eee8a (diff) | |
download | mongo-6659be6a462ee241f6b0a80a27ae9440bebf9216.tar.gz |
SERVER-72619 Refactor and add more testing for migration_chunk_cloner_source changes
(cherry picked from commit 9ad83e70708064f08742cc21ea87e6f9c90a73b1)
3 files changed, 974 insertions, 133 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index a5079a4041d..c324a8affe5 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -413,7 +413,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte } } else { invariant(PlanExecutor::IS_EOF == _jumboChunkCloneState->clonerState); - invariant(_cloneLocs.empty()); + invariant(_cloneList.hasMore()); } } @@ -712,61 +712,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon _jumboChunkCloneState->clonerExec->detachFromOperationContext(); } -boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc( - OperationContext* opCtx, const CollectionPtr& collection) { - while (true) { - stdx::unique_lock lk(_mutex); - invariant(_inProgressReads >= 0); - RecordId nextRecordId; - Snapshotted<BSONObj> doc; - - _moreDocsCV.wait(lk, [&]() { - return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() || - _inProgressReads == 0; - }); - - // One of the following must now be true (corresponding to the three if conditions): - // 1. There is a document in the overflow set - // 2. The iterator has not reached the end of the record id set - // 3. The overflow set is empty, the iterator is at the end, and - // no threads are holding a document. This condition indicates - // that there are no more docs to return for the cloning phase. - if (!_overflowDocs.empty()) { - doc = std::move(_overflowDocs.front()); - _overflowDocs.pop_front(); - ++_inProgressReads; - return doc; - } else if (_cloneRecordIdsIter != _cloneLocs.end()) { - nextRecordId = *_cloneRecordIdsIter; - ++_cloneRecordIdsIter; - ++_inProgressReads; - } else { - invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size()); - return boost::none; - } - - // In order to saturate the disk, the I/O operation must occur without - // holding the mutex. - lk.unlock(); - if (collection->findDoc(opCtx, nextRecordId, &doc)) - return doc; - lk.lock(); - ++_numRecordsPassedOver; - - // It is possible that this document is no longer in the collection, - // in which case, we try again and indicate to other threads that this - // thread is not holding a document. - --_inProgressReads; - _moreDocsCV.notify_one(); - } -} - -void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) { - stdx::lock_guard lk(_mutex); - invariant(_inProgressReads >= 1); - _overflowDocs.push_back(std::move(doc)); -} - void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx, const CollectionPtr& collection, BSONArrayBuilder* arrBuilder) { @@ -774,18 +719,24 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())); - while (auto doc = _getNextDoc(opCtx, collection)) { - ON_BLOCK_EXIT([&]() { + while (true) { + int recordsNoLongerExist = 0; + auto docInFlight = _cloneList.getNextDoc(opCtx, collection, &recordsNoLongerExist); + + if (recordsNoLongerExist) { stdx::lock_guard lk(_mutex); - invariant(_inProgressReads > 0); - --_inProgressReads; - _moreDocsCV.notify_one(); - }); + _numRecordsPassedOver += recordsNoLongerExist; + } + + const auto& doc = docInFlight->getDoc(); + if (!doc) { + break; + } // We must always make progress in this method by at least one document because empty // return indicates there is no more initial clone data. if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) { - _insertOverflowDoc(std::move(*doc)); + _cloneList.insertOverflowDoc(*doc); break; } @@ -807,7 +758,7 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon // that we take into consideration the overhead of BSONArray indices. if (arrBuilder->arrSize() && (arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) { - _insertOverflowDoc(std::move(*doc)); + _cloneList.insertOverflowDoc(*doc); break; } @@ -818,12 +769,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon arrBuilder->append(doc->value()); ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1); } - - // When we reach here, there are no more documents to return to the destination. - // We therefore need to notify a other threads that maybe sleeping on the condition - // variable that we are done. - stdx::lock_guard lk(_mutex); - _moreDocsCV.notify_one(); } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -832,7 +777,7 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { return static_cast<uint64_t>(BSONObjMaxUserSize); return std::min(static_cast<uint64_t>(BSONObjMaxUserSize), - _averageObjectSizeForCloneLocs * _cloneLocs.size()); + _averageObjectSizeForCloneLocs * _cloneList.size()); } Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, @@ -866,7 +811,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, { // All clone data must have been drained before starting to fetch the incremental changes. stdx::unique_lock<Latch> lk(_mutex); - invariant(_cloneRecordIdsIter == _cloneLocs.end()); + invariant(!_cloneList.hasMore()); // The "snapshot" for delete and update list must be taken under a single lock. This is to // ensure that we will preserve the causal order of writes. Always consume the delete @@ -1045,6 +990,8 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC try { BSONObj obj; RecordId recordId; + RecordIdSet recordIdSet; + while (PlanExecutor::ADVANCED == exec->getNext(&obj, &recordId)) { Status interruptStatus = opCtx->checkForInterruptNoAssert(); if (!interruptStatus.isOK()) { @@ -1052,20 +999,20 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC } if (!isLargeChunk) { - stdx::lock_guard<Latch> lk(_mutex); - _cloneLocs.insert(recordId); + recordIdSet.insert(recordId); } if (++recCount > maxRecsWhenFull) { isLargeChunk = true; if (_forceJumbo) { - _cloneLocs.clear(); + recordIdSet.clear(); break; } } } - _cloneRecordIdsIter = _cloneLocs.begin(); + + _cloneList.populateList(std::move(recordIdSet)); } catch (DBException& exception) { exception.addContext("Executor error while scanning for documents belonging to chunk"); throw; @@ -1185,13 +1132,13 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "response"_attr = redact(res), "memoryUsedBytes"_attr = _memoryUsed, "docsRemainingToClone"_attr = - _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver, + _cloneList.size() - _numRecordsCloned - _numRecordsPassedOver, "untransferredModsSizeBytes"_attr = untransferredModsSizeBytes); } if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase && estimateUntransferredSessionsSize == 0) { - if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) || + if (_cloneList.hasMore() || (_jumboChunkCloneState && _forceJumbo && PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) { return {ErrorCodes::OperationIncomplete, @@ -1335,4 +1282,135 @@ MigrationChunkClonerSourceLegacy::getNotificationForNextSessionMigrationBatch() return _sessionCatalogSource->getNotificationForNewOplog(); } +MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::DocumentInFlightWithLock( + WithLock lock, MigrationChunkClonerSourceLegacy::CloneList& clonerList) + : _inProgressReadToken( + std::make_unique<MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken>( + lock, clonerList)) {} + +void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::setDoc( + boost::optional<Snapshotted<BSONObj>> doc) { + _doc = std::move(doc); +} + +std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock> +MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::release() { + invariant(_inProgressReadToken); + + return std::make_unique< + MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>( + std::move(_inProgressReadToken), std::move(_doc)); +} + +MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock:: + DocumentInFlightWhileNotInLock( + std::unique_ptr<CloneList::InProgressReadToken> inProgressReadToken, + boost::optional<Snapshotted<BSONObj>> doc) + : _inProgressReadToken(std::move(inProgressReadToken)), _doc(std::move(doc)) {} + +void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::setDoc( + boost::optional<Snapshotted<BSONObj>> doc) { + _doc = std::move(doc); +} + +const boost::optional<Snapshotted<BSONObj>>& +MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::getDoc() { + return _doc; +} + +MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::InProgressReadToken( + WithLock withLock, CloneList& cloneList) + : _cloneList(cloneList) { + _cloneList._startedOneInProgressRead(withLock); +} + +MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::~InProgressReadToken() { + _cloneList._finishedOneInProgressRead(); +} + +MigrationChunkClonerSourceLegacy::CloneList::CloneList() { + _recordIdsIter = _recordIds.begin(); +} + +void MigrationChunkClonerSourceLegacy::CloneList::populateList(RecordIdSet recordIds) { + stdx::lock_guard lk(_mutex); + _recordIds = std::move(recordIds); + _recordIdsIter = _recordIds.begin(); +} + +void MigrationChunkClonerSourceLegacy::CloneList::insertOverflowDoc(Snapshotted<BSONObj> doc) { + stdx::lock_guard lk(_mutex); + invariant(_inProgressReads >= 1); + _overflowDocs.push_back(std::move(doc)); +} + +bool MigrationChunkClonerSourceLegacy::CloneList::hasMore() const { + stdx::lock_guard lk(_mutex); + return _recordIdsIter != _recordIds.cend() && _inProgressReads > 0; +} + +std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock> +MigrationChunkClonerSourceLegacy::CloneList::getNextDoc(OperationContext* opCtx, + const CollectionPtr& collection, + int* numRecordsNoLongerExist) { + while (true) { + stdx::unique_lock lk(_mutex); + invariant(_inProgressReads >= 0); + RecordId nextRecordId; + + opCtx->waitForConditionOrInterrupt(_moreDocsCV, lk, [&]() { + return _recordIdsIter != _recordIds.end() || !_overflowDocs.empty() || + _inProgressReads == 0; + }); + + DocumentInFlightWithLock docInFlight(lk, *this); + + // One of the following must now be true (corresponding to the three if conditions): + // 1. There is a document in the overflow set + // 2. The iterator has not reached the end of the record id set + // 3. The overflow set is empty, the iterator is at the end, and + // no threads are holding a document. This condition indicates + // that there are no more docs to return for the cloning phase. + if (!_overflowDocs.empty()) { + docInFlight.setDoc(std::move(_overflowDocs.front())); + _overflowDocs.pop_front(); + return docInFlight.release(); + } else if (_recordIdsIter != _recordIds.end()) { + nextRecordId = *_recordIdsIter; + ++_recordIdsIter; + } else { + return docInFlight.release(); + } + + lk.unlock(); + + auto docInFlightWhileNotLocked = docInFlight.release(); + + Snapshotted<BSONObj> doc; + if (collection->findDoc(opCtx, nextRecordId, &doc)) { + docInFlightWhileNotLocked->setDoc(std::move(doc)); + return docInFlightWhileNotLocked; + } + + if (numRecordsNoLongerExist) { + (*numRecordsNoLongerExist)++; + } + } +} + +size_t MigrationChunkClonerSourceLegacy::CloneList::size() const { + stdx::unique_lock lk(_mutex); + return _recordIds.size(); +} + +void MigrationChunkClonerSourceLegacy::CloneList::_startedOneInProgressRead(WithLock) { + _inProgressReads++; +} + +void MigrationChunkClonerSourceLegacy::CloneList::_finishedOneInProgressRead() { + stdx::lock_guard lk(_mutex); + _inProgressReads--; + _moreDocsCV.notify_one(); +} + } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 081ffc799ba..803c1f512af 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -226,6 +226,168 @@ private: friend class LogOpForShardingHandler; friend class LogTransactionOperationsForShardingHandler; + using RecordIdSet = std::set<RecordId>; + + /** + * This is responsible for all the logic revolving around handling documents that needs to be + * cloned. + * + * This class is multithread-safe. + */ + class CloneList { + public: + /** + * Simple container that increments the given counter when this is constructed and + * decrements it when it is destroyed. User of this class is responsible for holding + * necessary mutexes when counter is being modified. + */ + class InProgressReadToken { + public: + InProgressReadToken(WithLock, CloneList& cloneList); + InProgressReadToken(const InProgressReadToken&) = delete; + InProgressReadToken(InProgressReadToken&&) = default; + + ~InProgressReadToken(); + + private: + CloneList& _cloneList; + }; + + /** + * Container for a document that can be added to the nextCloneBatch call. As long as + * instances of this object exist, it will prevent getNextDoc from prematurely returning + * an empty response (which means there are no more docs left to clone). + * + * This assumes that _mutex is not being held when it is destroyed. + */ + class DocumentInFlightWhileNotInLock { + public: + DocumentInFlightWhileNotInLock(std::unique_ptr<InProgressReadToken> inProgressReadToken, + boost::optional<Snapshotted<BSONObj>> doc); + DocumentInFlightWhileNotInLock(const DocumentInFlightWhileNotInLock&) = delete; + DocumentInFlightWhileNotInLock(DocumentInFlightWhileNotInLock&&) = default; + + void setDoc(boost::optional<Snapshotted<BSONObj>> doc); + const boost::optional<Snapshotted<BSONObj>>& getDoc(); + + private: + std::unique_ptr<InProgressReadToken> _inProgressReadToken; + boost::optional<Snapshotted<BSONObj>> _doc; + }; + + /** + * A variant of the DocumentInFlightWhileNotInLock where the _mutex should be held while it + * has a document contained within it. + */ + class DocumentInFlightWithLock { + public: + DocumentInFlightWithLock(WithLock, CloneList& clonerList); + DocumentInFlightWithLock(const DocumentInFlightWithLock&) = delete; + DocumentInFlightWithLock(DocumentInFlightWithLock&&) = default; + + void setDoc(boost::optional<Snapshotted<BSONObj>> doc); + + /** + * Releases the contained document. Can only be called once for the entire lifetime + * of this object. + */ + std::unique_ptr<DocumentInFlightWhileNotInLock> release(); + + private: + std::unique_ptr<InProgressReadToken> _inProgressReadToken; + boost::optional<Snapshotted<BSONObj>> _doc; + }; + + CloneList(); + + /** + * Overwrites the list of record ids to clone. + */ + void populateList(RecordIdSet recordIds); + + /** + * Returns a document to clone. If there are no more documents left to clone, + * DocumentInFlightWhileNotInLock::getDoc will return boost::none. + * + * numRecordsNoLonger exists is an optional parameter that can be used to track + * the number of recordIds encountered that refers to a document that no longer + * exists. + */ + std::unique_ptr<DocumentInFlightWhileNotInLock> getNextDoc(OperationContext* opCtx, + const CollectionPtr& collection, + int* numRecordsNoLongerExist); + + /** + * Put back a document previously obtained from this CloneList instance to the overflow + * pool. + */ + void insertOverflowDoc(Snapshotted<BSONObj> doc); + + /** + * Returns true if there are more documents to clone. + */ + bool hasMore() const; + + /** + * Returns the size of the populated record ids. + */ + size_t size() const; + + private: + /** + * Increments the counter for inProgressReads. + */ + void _startedOneInProgressRead(WithLock); + + /** + * Decrements the counter for inProgressReads. + */ + void _finishedOneInProgressRead(); + + mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSource::CloneList::_mutex"); + + RecordIdSet _recordIds; + + // This iterator is a pointer into the _recordIds set. It allows concurrent access to + // the _recordIds set by allowing threads servicing _migrateClone requests to do the + // following: + // 1. Acquire mutex "_mutex" above. + // 2. Copy *_recordIdsIter into its local stack frame. + // 3. Increment _recordIdsIter + // 4. Unlock "_mutex." + // 5. Do the I/O to fetch the document corresponding to this record Id. + // + // The purpose of this algorithm, is to allow different threads to concurrently start I/O + // jobs in order to more fully saturate the disk. + // + // One issue with this algorithm, is that only 16MB worth of documents can be returned in + // response to a _migrateClone request. But, the thread does not know the size of a + // document until it does the I/O. At which point, if the document does not fit in the + // response to _migrateClone request the document must be made available to a different + // thread servicing a _migrateClone request. To solve this problem, the thread adds the + // document to the below _overflowDocs deque. + RecordIdSet::iterator _recordIdsIter; + + // This deque stores all documents that must be sent to the destination, but could not fit + // in the response to a particular _migrateClone request. + std::deque<Snapshotted<BSONObj>> _overflowDocs; + + // This integer represents how many documents are being "held" by threads servicing + // _migrateClone requests. Any document that is "held" by a thread may be added to the + // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request. + // This integer is necessary because it gives us a condition on when all documents to be + // sent to the destination have been exhausted. + // + // If (_recordIdsIter == _recordIds.end() && _overflowDocs.empty() && + // _inProgressReads == 0) then all documents have been returned to the destination. + RecordIdSet::size_type _inProgressReads = 0; + + // This condition variable allows us to wait on the following condition: + // Either we're done and the above condition is satisfied, or there is some document to + // return. + stdx::condition_variable _moreDocsCV; + }; + // Represents the states in which the cloner can be enum State { kNew, kCloning, kDone }; @@ -263,15 +425,6 @@ private: Status _storeCurrentLocs(OperationContext* opCtx); /** - * Returns boost::none if there are no more documents to get. - * Increments _inProgressReads if and only if return value is not none. - */ - boost::optional<Snapshotted<BSONObj>> _getNextDoc(OperationContext* opCtx, - const CollectionPtr& collection); - - void _insertOverflowDoc(Snapshotted<BSONObj> doc); - - /** * Adds the OpTime to the list of OpTimes for oplog entries that we should consider migrating as * part of session migration. */ @@ -359,49 +512,10 @@ private: // The current state of the cloner State _state{kNew}; - // List of record ids that needs to be transferred (initial clone) - std::set<RecordId> _cloneLocs; - - // This iterator is a pointer into the _cloneLocs set. It allows concurrent access to - // the _cloneLocs set by allowing threads servicing _migrateClone requests to do the - // following: - // 1. Acquire mutex "_mutex" above. - // 2. Copy *_cloneRecordIdsIter into its local stack frame. - // 3. Increment _cloneRecordIdsIter - // 4. Unlock "_mutex." - // 5. Do the I/O to fetch the document corresponding to this record Id. - // - // The purpose of this algorithm, is to allow different threads to concurrently start I/O jobs - // in order to more fully saturate the disk. - // - // One issue with this algorithm, is that only 16MB worth of documents can be returned in - // response to a _migrateClone request. But, the thread does not know the size of a document - // until it does the I/O. At which point, if the document does not fit in the response to - // _migrateClone request the document must be made available to a different thread servicing a - // _migrateClone request. To solve this problem, the thread adds the document - // to the below _overflowDocs deque. - std::set<RecordId>::iterator _cloneRecordIdsIter; - - // This deque stores all documents that must be sent to the destination, but could not fit - // in the response to a particular _migrateClone request. - std::deque<Snapshotted<BSONObj>> _overflowDocs; - - // This integer represents how many documents are being "held" by threads servicing - // _migrateClone requests. Any document that is "held" by a thread may be added to the - // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request. - // This integer is necessary because it gives us a condition on when all documents to be sent - // to the destination have been exhausted. - // - // If (_cloneRecordIdsIter == _cloneLocs.end() && _overflowDocs.empty() && _inProgressReads - // == 0) then all documents have been returned to the destination. - decltype(_cloneLocs.size()) _inProgressReads = 0; - - // This condition variable allows us to wait on the following condition: - // Either we're done and the above condition is satisfied, or there is some document to - // return. - stdx::condition_variable _moreDocsCV; - decltype(_cloneLocs.size()) _numRecordsCloned{0}; - decltype(_cloneLocs.size()) _numRecordsPassedOver{0}; + CloneList _cloneList; + + RecordIdSet::size_type _numRecordsCloned{0}; + RecordIdSet::size_type _numRecordsPassedOver{0}; // The estimated average object size during the clone phase. Used for buffer size // pre-allocation (initial clone). diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 91e1b4a21bc..c062c8e2fe3 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -66,6 +66,476 @@ const ConnectionString kRecipientConnStr = HostAndPort("RecipientHost2:1234"), HostAndPort("RecipientHost3:1234")}); +class CollectionWithFault : public Collection { +public: + CollectionWithFault(const Collection* originalCollection) : _coll(originalCollection) {} + + void setFindDocStatus(Status newStatus) { + _findDocStatus = newStatus; + } + + ////////////////////////////////////////////////////////////////////////////////// + // Collection overrides + + std::shared_ptr<Collection> clone() const override { + return _coll->clone(); + } + + SharedCollectionDecorations* getSharedDecorations() const override { + return _coll->getSharedDecorations(); + } + + const NamespaceString& ns() const override { + return _coll->ns(); + } + + Status rename(OperationContext* opCtx, const NamespaceString& nss, bool stayTemp) override { + MONGO_UNREACHABLE; + } + + RecordId getCatalogId() const override { + return _coll->getCatalogId(); + } + + UUID uuid() const override { + return _coll->uuid(); + } + + const IndexCatalog* getIndexCatalog() const override { + return _coll->getIndexCatalog(); + } + + IndexCatalog* getIndexCatalog() override { + MONGO_UNREACHABLE; + } + + RecordStore* getRecordStore() const { + return _coll->getRecordStore(); + } + + std::shared_ptr<Ident> getSharedIdent() const override { + return _coll->getSharedIdent(); + } + + const BSONObj getValidatorDoc() const override { + return _coll->getValidatorDoc(); + } + + std::pair<SchemaValidationResult, Status> checkValidation( + OperationContext* opCtx, const BSONObj& document) const override { + return _coll->checkValidation(opCtx, document); + } + + bool requiresIdIndex() const override { + return _coll->requiresIdIndex(); + } + + Snapshotted<BSONObj> docFor(OperationContext* opCtx, RecordId loc) const override { + return _coll->docFor(opCtx, loc); + } + + bool findDoc(OperationContext* opCtx, RecordId loc, Snapshotted<BSONObj>* out) const override { + uassertStatusOK(_findDocStatus); + return _coll->findDoc(opCtx, loc, out); + } + + std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx, + bool forward = true) const override { + return _coll->getCursor(opCtx, forward); + } + + bool updateWithDamagesSupported() const override { + return _coll->updateWithDamagesSupported(); + } + + Status truncate(OperationContext* opCtx) override { + MONGO_UNREACHABLE; + } + + Validator parseValidator(OperationContext* opCtx, + const BSONObj& validator, + MatchExpressionParser::AllowedFeatureSet allowedFeatures, + boost::optional<multiversion::FeatureCompatibilityVersion> + maxFeatureCompatibilityVersion) const override { + return _coll->parseValidator( + opCtx, validator, allowedFeatures, maxFeatureCompatibilityVersion); + } + + void setValidator(OperationContext* opCtx, Validator validator) override { + MONGO_UNREACHABLE; + } + + Status setValidationLevel(OperationContext* opCtx, ValidationLevelEnum newLevel) override { + MONGO_UNREACHABLE; + } + + Status setValidationAction(OperationContext* opCtx, ValidationActionEnum newAction) override { + MONGO_UNREACHABLE; + } + + boost::optional<ValidationLevelEnum> getValidationLevel() const override { + return _coll->getValidationLevel(); + } + + boost::optional<ValidationActionEnum> getValidationAction() const override { + return _coll->getValidationAction(); + } + + Status updateValidator(OperationContext* opCtx, + BSONObj newValidator, + boost::optional<ValidationLevelEnum> newLevel, + boost::optional<ValidationActionEnum> newAction) override { + MONGO_UNREACHABLE; + } + + Status checkValidatorAPIVersionCompatability(OperationContext* opCtx) const override { + return _coll->checkValidatorAPIVersionCompatability(opCtx); + } + + bool isChangeStreamPreAndPostImagesEnabled() const override { + return _coll->isChangeStreamPreAndPostImagesEnabled(); + } + + void setChangeStreamPreAndPostImages(OperationContext* opCtx, + ChangeStreamPreAndPostImagesOptions val) override { + MONGO_UNREACHABLE; + } + + bool isTemporary() const override { + return _coll->isTemporary(); + } + + boost::optional<bool> getTimeseriesBucketsMayHaveMixedSchemaData() const override { + return _coll->getTimeseriesBucketsMayHaveMixedSchemaData(); + } + + void setTimeseriesBucketsMayHaveMixedSchemaData(OperationContext* opCtx, + boost::optional<bool> setting) override { + MONGO_UNREACHABLE; + } + + bool doesTimeseriesBucketsDocContainMixedSchemaData(const BSONObj& bucketsDoc) const override { + return _coll->doesTimeseriesBucketsDocContainMixedSchemaData(bucketsDoc); + } + + bool getRequiresTimeseriesExtendedRangeSupport() const override { + return _coll->getRequiresTimeseriesExtendedRangeSupport(); + } + + void setRequiresTimeseriesExtendedRangeSupport(OperationContext* opCtx) const override { + return _coll->setRequiresTimeseriesExtendedRangeSupport(opCtx); + } + + bool isClustered() const override { + return _coll->isClustered(); + } + + boost::optional<ClusteredCollectionInfo> getClusteredInfo() const override { + return _coll->getClusteredInfo(); + } + + void updateClusteredIndexTTLSetting(OperationContext* opCtx, + boost::optional<int64_t> expireAfterSeconds) override { + MONGO_UNREACHABLE; + } + + Status updateCappedSize(OperationContext* opCtx, + boost::optional<long long> newCappedSize, + boost::optional<long long> newCappedMax) override { + MONGO_UNREACHABLE; + } + + StatusWith<int> checkMetaDataForIndex(const std::string& indexName, + const BSONObj& spec) const override { + return _coll->checkMetaDataForIndex(indexName, spec); + } + + void updateTTLSetting(OperationContext* opCtx, + StringData idxName, + long long newExpireSeconds) override { + MONGO_UNREACHABLE; + } + + void updateHiddenSetting(OperationContext* opCtx, StringData idxName, bool hidden) override { + MONGO_UNREACHABLE; + } + + void updateUniqueSetting(OperationContext* opCtx, StringData idxName, bool unique) override { + MONGO_UNREACHABLE; + } + + void updatePrepareUniqueSetting(OperationContext* opCtx, + StringData idxName, + bool prepareUnique) override { + MONGO_UNREACHABLE; + } + + void setIsTemp(OperationContext* opCtx, bool isTemp) override { + MONGO_UNREACHABLE; + } + + void removeIndex(OperationContext* opCtx, StringData indexName) override { + MONGO_UNREACHABLE; + } + + Status prepareForIndexBuild(OperationContext* opCtx, + const IndexDescriptor* spec, + boost::optional<UUID> buildUUID, + bool isBackgroundSecondaryBuild) override { + MONGO_UNREACHABLE; + } + + boost::optional<UUID> getIndexBuildUUID(StringData indexName) const override { + return _coll->getIndexBuildUUID(indexName); + } + + bool isIndexMultikey(OperationContext* opCtx, + StringData indexName, + MultikeyPaths* multikeyPaths, + int indexOffset = -1) const override { + return _coll->isIndexMultikey(opCtx, indexName, multikeyPaths, indexOffset); + } + + bool setIndexIsMultikey(OperationContext* opCtx, + StringData indexName, + const MultikeyPaths& multikeyPaths, + int indexOffset = -1) const override { + return _coll->setIndexIsMultikey(opCtx, indexName, multikeyPaths, indexOffset); + } + + void forceSetIndexIsMultikey(OperationContext* opCtx, + const IndexDescriptor* desc, + bool isMultikey, + const MultikeyPaths& multikeyPaths) const override { + return _coll->forceSetIndexIsMultikey(opCtx, desc, isMultikey, multikeyPaths); + } + + int getTotalIndexCount() const override { + return _coll->getTotalIndexCount(); + } + + int getCompletedIndexCount() const override { + return _coll->getCompletedIndexCount(); + } + + BSONObj getIndexSpec(StringData indexName) const override { + return _coll->getIndexSpec(indexName); + } + + void getAllIndexes(std::vector<std::string>* names) const override { + return _coll->getAllIndexes(names); + } + + void getReadyIndexes(std::vector<std::string>* names) const override { + return _coll->getReadyIndexes(names); + } + + bool isIndexPresent(StringData indexName) const override { + return _coll->isIndexPresent(indexName); + } + + bool isIndexReady(StringData indexName) const override { + return _coll->isIndexReady(indexName); + } + + void replaceMetadata(OperationContext* opCtx, + std::shared_ptr<BSONCollectionCatalogEntry::MetaData> md) override { + MONGO_UNREACHABLE; + } + + bool isCapped() const override { + return _coll->isCapped(); + } + + long long getCappedMaxDocs() const override { + return _coll->getCappedMaxDocs(); + } + + long long getCappedMaxSize() const override { + return _coll->getCappedMaxSize(); + } + + long long numRecords(OperationContext* opCtx) const override { + return _coll->numRecords(opCtx); + } + + long long dataSize(OperationContext* opCtx) const override { + return _coll->dataSize(opCtx); + } + + bool isEmpty(OperationContext* opCtx) const override { + return _coll->isEmpty(opCtx); + } + + int averageObjectSize(OperationContext* opCtx) const override { + return _coll->averageObjectSize(opCtx); + } + + uint64_t getIndexSize(OperationContext* opCtx, + BSONObjBuilder* details = nullptr, + int scale = 1) const { + return _coll->getIndexSize(opCtx, details, scale); + } + + uint64_t getIndexFreeStorageBytes(OperationContext* opCtx) const override { + return _coll->getIndexFreeStorageBytes(opCtx); + } + + boost::optional<Timestamp> getMinimumVisibleSnapshot() const override { + return _coll->getMinimumVisibleSnapshot(); + } + + void setMinimumVisibleSnapshot(Timestamp name) override { + MONGO_UNREACHABLE; + } + + boost::optional<TimeseriesOptions> getTimeseriesOptions() const override { + return _coll->getTimeseriesOptions(); + } + + void setTimeseriesOptions(OperationContext* opCtx, + const TimeseriesOptions& tsOptions) override { + MONGO_UNREACHABLE; + } + + const CollatorInterface* getDefaultCollator() const override { + return _coll->getDefaultCollator(); + } + + const CollectionOptions& getCollectionOptions() const override { + return _coll->getCollectionOptions(); + } + + StatusWith<std::vector<BSONObj>> addCollationDefaultsToIndexSpecsForCreate( + OperationContext* opCtx, const std::vector<BSONObj>& indexSpecs) const { + return _coll->addCollationDefaultsToIndexSpecsForCreate(opCtx, indexSpecs); + } + + void indexBuildSuccess(OperationContext* opCtx, IndexCatalogEntry* index) override { + MONGO_UNREACHABLE; + } + + void onDeregisterFromCatalog(OperationContext* opCtx) override { + MONGO_UNREACHABLE; + } + + void deleteDocument(OperationContext* opCtx, + StmtId stmtId, + RecordId loc, + OpDebug* opDebug, + bool fromMigrate = false, + bool noWarn = false, + StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off, + CheckRecordId checkRecordId = CheckRecordId::Off) const override { + MONGO_UNREACHABLE; + } + + void deleteDocument(OperationContext* opCtx, + Snapshotted<BSONObj> doc, + StmtId stmtId, + RecordId loc, + OpDebug* opDebug, + bool fromMigrate = false, + bool noWarn = false, + StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off, + CheckRecordId checkRecordId = CheckRecordId::Off) const override { + MONGO_UNREACHABLE; + } + + Status insertDocuments(OperationContext* opCtx, + std::vector<InsertStatement>::const_iterator begin, + std::vector<InsertStatement>::const_iterator end, + OpDebug* opDebug, + bool fromMigrate = false) const override { + MONGO_UNREACHABLE; + } + + Status insertDocument(OperationContext* opCtx, + const InsertStatement& doc, + OpDebug* opDebug, + bool fromMigrate = false) const override { + MONGO_UNREACHABLE; + } + + Status insertDocumentsForOplog(OperationContext* opCtx, + std::vector<Record>* records, + const std::vector<Timestamp>& timestamps) const override { + MONGO_UNREACHABLE; + } + + Status insertDocumentForBulkLoader(OperationContext* opCtx, + const BSONObj& doc, + const OnRecordInsertedFn& onRecordInserted) const override { + MONGO_UNREACHABLE; + } + + RecordId updateDocument(OperationContext* opCtx, + RecordId oldLocation, + const Snapshotted<BSONObj>& oldDoc, + const BSONObj& newDoc, + bool indexesAffected, + OpDebug* opDebug, + CollectionUpdateArgs* args) const override { + MONGO_UNREACHABLE; + } + + StatusWith<RecordData> updateDocumentWithDamages(OperationContext* opCtx, + RecordId loc, + const Snapshotted<RecordData>& oldRec, + const char* damageSource, + const mutablebson::DamageVector& damages, + CollectionUpdateArgs* args) const { + MONGO_UNREACHABLE; + } + + void cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive) const override { + MONGO_UNREACHABLE; + } + + bool getRecordPreImages() const override { + return _coll->getRecordPreImages(); + } + + void setRecordPreImages(OperationContext* opCtx, bool val) override { + MONGO_UNREACHABLE; + } + + std::vector<std::string> removeInvalidIndexOptions(OperationContext* opCtx) override { + MONGO_UNREACHABLE; + } + + CappedCallback* getCappedCallback() override { + MONGO_UNREACHABLE; + } + + const CappedCallback* getCappedCallback() const override { + return _coll->getCappedCallback(); + } + + std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier() const override { + return _coll->getCappedInsertNotifier(); + } + + std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makePlanExecutor( + OperationContext* opCtx, + const CollectionPtr& yieldableCollection, + PlanYieldPolicy::YieldPolicy yieldPolicy, + ScanDirection scanDirection, + boost::optional<RecordId> resumeAfterRecordId = boost::none) const override { + return _coll->makePlanExecutor( + opCtx, yieldableCollection, yieldPolicy, scanDirection, resumeAfterRecordId); + } + + void establishOplogCollectionForLogging(OperationContext* opCtx) const override { + return _coll->establishOplogCollectionForLogging(opCtx); + } + +private: + const Collection* _coll; + + Status _findDocStatus{Status::OK()}; +}; + class MigrationChunkClonerSourceLegacyTest : public ShardServerTestFixture { protected: MigrationChunkClonerSourceLegacyTest() : ShardServerTestFixture(Options{}.useMockClock(true)) {} @@ -127,9 +597,29 @@ protected: if (docs.empty()) return; - auto response = client()->insertAcknowledged(kNss.ns(), docs); - ASSERT_OK(getStatusFromWriteCommandReply(response)); - ASSERT_GT(response["n"].Int(), 0); + std::deque<BSONObj> docsToInsert; + std::copy(docs.cbegin(), docs.cend(), std::back_inserter(docsToInsert)); + + while (!docsToInsert.empty()) { + std::vector<BSONObj> batchToInsert; + + size_t sizeInBatch = 0; + while (!docsToInsert.empty()) { + auto next = docsToInsert.front(); + sizeInBatch += next.objsize(); + + if (sizeInBatch > BSONObjMaxUserSize) { + break; + } + + batchToInsert.push_back(next); + docsToInsert.pop_front(); + } + + auto response = client()->insertAcknowledged(kNss.ns(), batchToInsert); + ASSERT_OK(getStatusFromWriteCommandReply(response)); + ASSERT_GT(response["n"].Int(), 0); + } } void deleteDocsInShardedCollection(BSONObj query) { @@ -701,5 +1191,164 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) { cloner.cancelClone(operationContext()); } +TEST_F(MigrationChunkClonerSourceLegacyTest, CloneFetchThatOverflows) { + const auto kBigSize = 10 * 1024 * 1024; + const std::vector<BSONObj> contents = {createSizedCollectionDocument(100, kBigSize), + createSizedCollectionDocument(120, kBigSize), + createSizedCollectionDocument(199, kBigSize)}; + + createShardedCollection(contents); + + ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); + req.setMaxChunkSizeBytes(64 * 1024 * 1024); + + MigrationChunkClonerSourceLegacy cloner(req, + WriteConcernOptions(), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); + + { + auto futureStartClone = launchAsync([&]() { + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + + ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); + futureStartClone.default_timed_get(); + } + + // Ensure the initial clone documents are available + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(1, arrBuilder.arrSize()); + + const auto arr = arrBuilder.arr(); + ASSERT_BSONOBJ_EQ(contents[0], arr[0].Obj()); + } + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(1, arrBuilder.arrSize()); + + const auto arr = arrBuilder.arr(); + ASSERT_BSONOBJ_EQ(contents[1], arr[0].Obj()); + } + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(1, arrBuilder.arrSize()); + + const auto arr = arrBuilder.arr(); + ASSERT_BSONOBJ_EQ(contents[2], arr[0].Obj()); + } + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + ASSERT_EQ(0, arrBuilder.arrSize()); + } + } + + auto futureCommit = launchAsync([&]() { + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + + ASSERT_OK(cloner.commitClone(operationContext(), true /* acquireCSOnRecipient */)); + futureCommit.default_timed_get(); +} + +TEST_F(MigrationChunkClonerSourceLegacyTest, CloneShouldNotCrashWhenNextCloneBatchThrows) { + const std::vector<BSONObj> contents = {createCollectionDocument(100), + createCollectionDocument(150), + createCollectionDocument(199)}; + + createShardedCollection(contents); + + const ShardsvrMoveRange req = + createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); + MigrationChunkClonerSourceLegacy cloner(req, + WriteConcernOptions(), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); + + { + auto futureStartClone = launchAsync([&]() { + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + + ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); + futureStartClone.default_timed_get(); + } + + { + AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); + + { + auto collWithFault = + std::make_unique<CollectionWithFault>(autoColl.getCollection().get()); + CollectionPtr collPtrWithFault(collWithFault.get(), CollectionPtr::NoYieldTag()); + + // Note: findDoc currently doesn't have any interruption points, this test simulates + // an exception being thrown while it is being called. + collWithFault->setFindDocStatus({ErrorCodes::Interrupted, "fake interrupt"}); + + BSONArrayBuilder arrBuilder; + + ASSERT_THROWS_CODE( + cloner.nextCloneBatch(operationContext(), collPtrWithFault, &arrBuilder), + DBException, + ErrorCodes::Interrupted); + ASSERT_EQ(0, arrBuilder.arrSize()); + } + + // The first document was lost and returned an error during nextCloneBatch. This would + // cause the migration destination to abort, but it is still possible for other + // threads to be in the middle of calling nextCloneBatch and the next nextCloneBatch + // calls simulate calls from other threads after the first call threw. + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + + const auto arr = arrBuilder.arr(); + ASSERT_EQ(2, arrBuilder.arrSize()); + + ASSERT_BSONOBJ_EQ(contents[1], arr[0].Obj()); + ASSERT_BSONOBJ_EQ(contents[2], arr[1].Obj()); + } + + { + BSONArrayBuilder arrBuilder; + ASSERT_OK( + cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); + + const auto arr = arrBuilder.arr(); + ASSERT_EQ(0, arrBuilder.arrSize()); + } + } + + auto futureCommit = launchAsync([&]() { + // Simulate destination returning an error. + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << false); }); + + // This is the return response for recvChunkAbort. + onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); + }); + + ASSERT_NOT_OK(cloner.commitClone(operationContext(), true /* acquireCSOnRecipient */)); + futureCommit.default_timed_get(); +} + } // namespace } // namespace mongo |