diff options
author | Randolph Tan <randolph@10gen.com> | 2023-03-02 15:54:36 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-02 17:26:25 +0000 |
commit | 3f72fcd96b8ce1738d76d529baa01a2db0a62ac1 (patch) | |
tree | f408922144963ceccfad6c4af128265d89b71463 | |
parent | 5321501d5dc7814641fc3ba6e5535b038ddbf4dd (diff) | |
download | mongo-3f72fcd96b8ce1738d76d529baa01a2db0a62ac1.tar.gz |
Revert "SERVER-72619 Refactor and add more testing for migration_chunk_cloner_source changes"
This reverts commit 6659be6a462ee241f6b0a80a27ae9440bebf9216.
3 files changed, 133 insertions, 974 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 1f11e168733..d56601cb85b 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -427,7 +427,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte } } else { invariant(PlanExecutor::IS_EOF == _jumboChunkCloneState->clonerState); - invariant(_cloneList.hasMore()); + invariant(_cloneLocs.empty()); } } @@ -726,6 +726,61 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromIndexScan(OperationCon _jumboChunkCloneState->clonerExec->detachFromOperationContext(); } +boost::optional<Snapshotted<BSONObj>> MigrationChunkClonerSourceLegacy::_getNextDoc( + OperationContext* opCtx, const CollectionPtr& collection) { + while (true) { + stdx::unique_lock lk(_mutex); + invariant(_inProgressReads >= 0); + RecordId nextRecordId; + Snapshotted<BSONObj> doc; + + _moreDocsCV.wait(lk, [&]() { + return _cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty() || + _inProgressReads == 0; + }); + + // One of the following must now be true (corresponding to the three if conditions): + // 1. There is a document in the overflow set + // 2. The iterator has not reached the end of the record id set + // 3. The overflow set is empty, the iterator is at the end, and + // no threads are holding a document. This condition indicates + // that there are no more docs to return for the cloning phase. + if (!_overflowDocs.empty()) { + doc = std::move(_overflowDocs.front()); + _overflowDocs.pop_front(); + ++_inProgressReads; + return doc; + } else if (_cloneRecordIdsIter != _cloneLocs.end()) { + nextRecordId = *_cloneRecordIdsIter; + ++_cloneRecordIdsIter; + ++_inProgressReads; + } else { + invariant(_numRecordsCloned + _numRecordsPassedOver == _cloneLocs.size()); + return boost::none; + } + + // In order to saturate the disk, the I/O operation must occur without + // holding the mutex. + lk.unlock(); + if (collection->findDoc(opCtx, nextRecordId, &doc)) + return doc; + lk.lock(); + ++_numRecordsPassedOver; + + // It is possible that this document is no longer in the collection, + // in which case, we try again and indicate to other threads that this + // thread is not holding a document. + --_inProgressReads; + _moreDocsCV.notify_one(); + } +} + +void MigrationChunkClonerSourceLegacy::_insertOverflowDoc(Snapshotted<BSONObj> doc) { + stdx::lock_guard lk(_mutex); + invariant(_inProgressReads >= 1); + _overflowDocs.push_back(std::move(doc)); +} + void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationContext* opCtx, const CollectionPtr& collection, BSONArrayBuilder* arrBuilder) { @@ -733,24 +788,18 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())); - while (true) { - int recordsNoLongerExist = 0; - auto docInFlight = _cloneList.getNextDoc(opCtx, collection, &recordsNoLongerExist); - - if (recordsNoLongerExist) { + while (auto doc = _getNextDoc(opCtx, collection)) { + ON_BLOCK_EXIT([&]() { stdx::lock_guard lk(_mutex); - _numRecordsPassedOver += recordsNoLongerExist; - } - - const auto& doc = docInFlight->getDoc(); - if (!doc) { - break; - } + invariant(_inProgressReads > 0); + --_inProgressReads; + _moreDocsCV.notify_one(); + }); // We must always make progress in this method by at least one document because empty // return indicates there is no more initial clone data. if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) { - _cloneList.insertOverflowDoc(*doc); + _insertOverflowDoc(std::move(*doc)); break; } @@ -772,7 +821,7 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon // that we take into consideration the overhead of BSONArray indices. if (arrBuilder->arrSize() && (arrBuilder->len() + doc->value().objsize() + 1024) > BSONObjMaxUserSize) { - _cloneList.insertOverflowDoc(*doc); + _insertOverflowDoc(std::move(*doc)); break; } @@ -783,6 +832,12 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon arrBuilder->append(doc->value()); ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1); } + + // When we reach here, there are no more documents to return to the destination. + // We therefore need to notify a other threads that maybe sleeping on the condition + // variable that we are done. + stdx::lock_guard lk(_mutex); + _moreDocsCV.notify_one(); } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -791,7 +846,7 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { return static_cast<uint64_t>(BSONObjMaxUserSize); return std::min(static_cast<uint64_t>(BSONObjMaxUserSize), - _averageObjectSizeForCloneLocs * _cloneList.size()); + _averageObjectSizeForCloneLocs * _cloneLocs.size()); } Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, @@ -892,7 +947,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, { // All clone data must have been drained before starting to fetch the incremental changes. stdx::unique_lock<Latch> lk(_mutex); - invariant(!_cloneList.hasMore()); + invariant(_cloneRecordIdsIter == _cloneLocs.end()); // The "snapshot" for delete and update list must be taken under a single lock. This is to // ensure that we will preserve the causal order of writes. Always consume the delete @@ -1074,8 +1129,6 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC try { BSONObj obj; RecordId recordId; - RecordIdSet recordIdSet; - while (PlanExecutor::ADVANCED == exec->getNext(&obj, &recordId)) { Status interruptStatus = opCtx->checkForInterruptNoAssert(); if (!interruptStatus.isOK()) { @@ -1083,20 +1136,20 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC } if (!isLargeChunk) { - recordIdSet.insert(recordId); + stdx::lock_guard<Latch> lk(_mutex); + _cloneLocs.insert(recordId); } if (++recCount > maxRecsWhenFull) { isLargeChunk = true; if (_forceJumbo) { - recordIdSet.clear(); + _cloneLocs.clear(); break; } } } - - _cloneList.populateList(std::move(recordIdSet)); + _cloneRecordIdsIter = _cloneLocs.begin(); } catch (DBException& exception) { exception.addContext("Executor error while scanning for documents belonging to chunk"); throw; @@ -1217,13 +1270,13 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "response"_attr = redact(res), "memoryUsedBytes"_attr = _memoryUsed, "docsRemainingToClone"_attr = - _cloneList.size() - _numRecordsCloned - _numRecordsPassedOver, + _cloneLocs.size() - _numRecordsCloned - _numRecordsPassedOver, "untransferredModsSizeBytes"_attr = untransferredModsSizeBytes); } if (res["state"].String() == "steady" && sessionCatalogSourceInCatchupPhase && estimateUntransferredSessionsSize == 0) { - if (_cloneList.hasMore() || + if ((_cloneRecordIdsIter != _cloneLocs.end() || !_overflowDocs.empty()) || (_jumboChunkCloneState && _forceJumbo && PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) { return {ErrorCodes::OperationIncomplete, @@ -1368,135 +1421,4 @@ MigrationChunkClonerSourceLegacy::getNotificationForNextSessionMigrationBatch() return _sessionCatalogSource->getNotificationForNewOplog(); } -MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::DocumentInFlightWithLock( - WithLock lock, MigrationChunkClonerSourceLegacy::CloneList& clonerList) - : _inProgressReadToken( - std::make_unique<MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken>( - lock, clonerList)) {} - -void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::setDoc( - boost::optional<Snapshotted<BSONObj>> doc) { - _doc = std::move(doc); -} - -std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock> -MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWithLock::release() { - invariant(_inProgressReadToken); - - return std::make_unique< - MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock>( - std::move(_inProgressReadToken), std::move(_doc)); -} - -MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock:: - DocumentInFlightWhileNotInLock( - std::unique_ptr<CloneList::InProgressReadToken> inProgressReadToken, - boost::optional<Snapshotted<BSONObj>> doc) - : _inProgressReadToken(std::move(inProgressReadToken)), _doc(std::move(doc)) {} - -void MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::setDoc( - boost::optional<Snapshotted<BSONObj>> doc) { - _doc = std::move(doc); -} - -const boost::optional<Snapshotted<BSONObj>>& -MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock::getDoc() { - return _doc; -} - -MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::InProgressReadToken( - WithLock withLock, CloneList& cloneList) - : _cloneList(cloneList) { - _cloneList._startedOneInProgressRead(withLock); -} - -MigrationChunkClonerSourceLegacy::CloneList::InProgressReadToken::~InProgressReadToken() { - _cloneList._finishedOneInProgressRead(); -} - -MigrationChunkClonerSourceLegacy::CloneList::CloneList() { - _recordIdsIter = _recordIds.begin(); -} - -void MigrationChunkClonerSourceLegacy::CloneList::populateList(RecordIdSet recordIds) { - stdx::lock_guard lk(_mutex); - _recordIds = std::move(recordIds); - _recordIdsIter = _recordIds.begin(); -} - -void MigrationChunkClonerSourceLegacy::CloneList::insertOverflowDoc(Snapshotted<BSONObj> doc) { - stdx::lock_guard lk(_mutex); - invariant(_inProgressReads >= 1); - _overflowDocs.push_back(std::move(doc)); -} - -bool MigrationChunkClonerSourceLegacy::CloneList::hasMore() const { - stdx::lock_guard lk(_mutex); - return _recordIdsIter != _recordIds.cend() && _inProgressReads > 0; -} - -std::unique_ptr<MigrationChunkClonerSourceLegacy::CloneList::DocumentInFlightWhileNotInLock> -MigrationChunkClonerSourceLegacy::CloneList::getNextDoc(OperationContext* opCtx, - const CollectionPtr& collection, - int* numRecordsNoLongerExist) { - while (true) { - stdx::unique_lock lk(_mutex); - invariant(_inProgressReads >= 0); - RecordId nextRecordId; - - opCtx->waitForConditionOrInterrupt(_moreDocsCV, lk, [&]() { - return _recordIdsIter != _recordIds.end() || !_overflowDocs.empty() || - _inProgressReads == 0; - }); - - DocumentInFlightWithLock docInFlight(lk, *this); - - // One of the following must now be true (corresponding to the three if conditions): - // 1. There is a document in the overflow set - // 2. The iterator has not reached the end of the record id set - // 3. The overflow set is empty, the iterator is at the end, and - // no threads are holding a document. This condition indicates - // that there are no more docs to return for the cloning phase. - if (!_overflowDocs.empty()) { - docInFlight.setDoc(std::move(_overflowDocs.front())); - _overflowDocs.pop_front(); - return docInFlight.release(); - } else if (_recordIdsIter != _recordIds.end()) { - nextRecordId = *_recordIdsIter; - ++_recordIdsIter; - } else { - return docInFlight.release(); - } - - lk.unlock(); - - auto docInFlightWhileNotLocked = docInFlight.release(); - - Snapshotted<BSONObj> doc; - if (collection->findDoc(opCtx, nextRecordId, &doc)) { - docInFlightWhileNotLocked->setDoc(std::move(doc)); - return docInFlightWhileNotLocked; - } - - if (numRecordsNoLongerExist) { - (*numRecordsNoLongerExist)++; - } - } -} - -size_t MigrationChunkClonerSourceLegacy::CloneList::size() const { - stdx::unique_lock lk(_mutex); - return _recordIds.size(); -} - -void MigrationChunkClonerSourceLegacy::CloneList::_startedOneInProgressRead(WithLock) { - _inProgressReads++; -} - -void MigrationChunkClonerSourceLegacy::CloneList::_finishedOneInProgressRead() { - stdx::lock_guard lk(_mutex); - _inProgressReads--; - _moreDocsCV.notify_one(); -} - } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 4fff7da8d17..7a59f6029bc 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -228,168 +228,6 @@ private: friend class LogOpForShardingHandler; friend class LogTransactionOperationsForShardingHandler; - using RecordIdSet = std::set<RecordId>; - - /** - * This is responsible for all the logic revolving around handling documents that needs to be - * cloned. - * - * This class is multithread-safe. - */ - class CloneList { - public: - /** - * Simple container that increments the given counter when this is constructed and - * decrements it when it is destroyed. User of this class is responsible for holding - * necessary mutexes when counter is being modified. - */ - class InProgressReadToken { - public: - InProgressReadToken(WithLock, CloneList& cloneList); - InProgressReadToken(const InProgressReadToken&) = delete; - InProgressReadToken(InProgressReadToken&&) = default; - - ~InProgressReadToken(); - - private: - CloneList& _cloneList; - }; - - /** - * Container for a document that can be added to the nextCloneBatch call. As long as - * instances of this object exist, it will prevent getNextDoc from prematurely returning - * an empty response (which means there are no more docs left to clone). - * - * This assumes that _mutex is not being held when it is destroyed. - */ - class DocumentInFlightWhileNotInLock { - public: - DocumentInFlightWhileNotInLock(std::unique_ptr<InProgressReadToken> inProgressReadToken, - boost::optional<Snapshotted<BSONObj>> doc); - DocumentInFlightWhileNotInLock(const DocumentInFlightWhileNotInLock&) = delete; - DocumentInFlightWhileNotInLock(DocumentInFlightWhileNotInLock&&) = default; - - void setDoc(boost::optional<Snapshotted<BSONObj>> doc); - const boost::optional<Snapshotted<BSONObj>>& getDoc(); - - private: - std::unique_ptr<InProgressReadToken> _inProgressReadToken; - boost::optional<Snapshotted<BSONObj>> _doc; - }; - - /** - * A variant of the DocumentInFlightWhileNotInLock where the _mutex should be held while it - * has a document contained within it. - */ - class DocumentInFlightWithLock { - public: - DocumentInFlightWithLock(WithLock, CloneList& clonerList); - DocumentInFlightWithLock(const DocumentInFlightWithLock&) = delete; - DocumentInFlightWithLock(DocumentInFlightWithLock&&) = default; - - void setDoc(boost::optional<Snapshotted<BSONObj>> doc); - - /** - * Releases the contained document. Can only be called once for the entire lifetime - * of this object. - */ - std::unique_ptr<DocumentInFlightWhileNotInLock> release(); - - private: - std::unique_ptr<InProgressReadToken> _inProgressReadToken; - boost::optional<Snapshotted<BSONObj>> _doc; - }; - - CloneList(); - - /** - * Overwrites the list of record ids to clone. - */ - void populateList(RecordIdSet recordIds); - - /** - * Returns a document to clone. If there are no more documents left to clone, - * DocumentInFlightWhileNotInLock::getDoc will return boost::none. - * - * numRecordsNoLonger exists is an optional parameter that can be used to track - * the number of recordIds encountered that refers to a document that no longer - * exists. - */ - std::unique_ptr<DocumentInFlightWhileNotInLock> getNextDoc(OperationContext* opCtx, - const CollectionPtr& collection, - int* numRecordsNoLongerExist); - - /** - * Put back a document previously obtained from this CloneList instance to the overflow - * pool. - */ - void insertOverflowDoc(Snapshotted<BSONObj> doc); - - /** - * Returns true if there are more documents to clone. - */ - bool hasMore() const; - - /** - * Returns the size of the populated record ids. - */ - size_t size() const; - - private: - /** - * Increments the counter for inProgressReads. - */ - void _startedOneInProgressRead(WithLock); - - /** - * Decrements the counter for inProgressReads. - */ - void _finishedOneInProgressRead(); - - mutable Mutex _mutex = MONGO_MAKE_LATCH("MigrationChunkClonerSource::CloneList::_mutex"); - - RecordIdSet _recordIds; - - // This iterator is a pointer into the _recordIds set. It allows concurrent access to - // the _recordIds set by allowing threads servicing _migrateClone requests to do the - // following: - // 1. Acquire mutex "_mutex" above. - // 2. Copy *_recordIdsIter into its local stack frame. - // 3. Increment _recordIdsIter - // 4. Unlock "_mutex." - // 5. Do the I/O to fetch the document corresponding to this record Id. - // - // The purpose of this algorithm, is to allow different threads to concurrently start I/O - // jobs in order to more fully saturate the disk. - // - // One issue with this algorithm, is that only 16MB worth of documents can be returned in - // response to a _migrateClone request. But, the thread does not know the size of a - // document until it does the I/O. At which point, if the document does not fit in the - // response to _migrateClone request the document must be made available to a different - // thread servicing a _migrateClone request. To solve this problem, the thread adds the - // document to the below _overflowDocs deque. - RecordIdSet::iterator _recordIdsIter; - - // This deque stores all documents that must be sent to the destination, but could not fit - // in the response to a particular _migrateClone request. - std::deque<Snapshotted<BSONObj>> _overflowDocs; - - // This integer represents how many documents are being "held" by threads servicing - // _migrateClone requests. Any document that is "held" by a thread may be added to the - // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request. - // This integer is necessary because it gives us a condition on when all documents to be - // sent to the destination have been exhausted. - // - // If (_recordIdsIter == _recordIds.end() && _overflowDocs.empty() && - // _inProgressReads == 0) then all documents have been returned to the destination. - RecordIdSet::size_type _inProgressReads = 0; - - // This condition variable allows us to wait on the following condition: - // Either we're done and the above condition is satisfied, or there is some document to - // return. - stdx::condition_variable _moreDocsCV; - }; - // Represents the states in which the cloner can be enum State { kNew, kCloning, kDone }; @@ -427,6 +265,15 @@ private: Status _storeCurrentLocs(OperationContext* opCtx); /** + * Returns boost::none if there are no more documents to get. + * Increments _inProgressReads if and only if return value is not none. + */ + boost::optional<Snapshotted<BSONObj>> _getNextDoc(OperationContext* opCtx, + const CollectionPtr& collection); + + void _insertOverflowDoc(Snapshotted<BSONObj> doc); + + /** * Adds the OpTime to the list of OpTimes for oplog entries that we should consider migrating as * part of session migration. */ @@ -531,10 +378,49 @@ private: // The current state of the cloner State _state{kNew}; - CloneList _cloneList; - - RecordIdSet::size_type _numRecordsCloned{0}; - RecordIdSet::size_type _numRecordsPassedOver{0}; + // List of record ids that needs to be transferred (initial clone) + std::set<RecordId> _cloneLocs; + + // This iterator is a pointer into the _cloneLocs set. It allows concurrent access to + // the _cloneLocs set by allowing threads servicing _migrateClone requests to do the + // following: + // 1. Acquire mutex "_mutex" above. + // 2. Copy *_cloneRecordIdsIter into its local stack frame. + // 3. Increment _cloneRecordIdsIter + // 4. Unlock "_mutex." + // 5. Do the I/O to fetch the document corresponding to this record Id. + // + // The purpose of this algorithm, is to allow different threads to concurrently start I/O jobs + // in order to more fully saturate the disk. + // + // One issue with this algorithm, is that only 16MB worth of documents can be returned in + // response to a _migrateClone request. But, the thread does not know the size of a document + // until it does the I/O. At which point, if the document does not fit in the response to + // _migrateClone request the document must be made available to a different thread servicing a + // _migrateClone request. To solve this problem, the thread adds the document + // to the below _overflowDocs deque. + std::set<RecordId>::iterator _cloneRecordIdsIter; + + // This deque stores all documents that must be sent to the destination, but could not fit + // in the response to a particular _migrateClone request. + std::deque<Snapshotted<BSONObj>> _overflowDocs; + + // This integer represents how many documents are being "held" by threads servicing + // _migrateClone requests. Any document that is "held" by a thread may be added to the + // _overflowDocs deque if it doesn't fit in the response to a _migrateClone request. + // This integer is necessary because it gives us a condition on when all documents to be sent + // to the destination have been exhausted. + // + // If (_cloneRecordIdsIter == _cloneLocs.end() && _overflowDocs.empty() && _inProgressReads + // == 0) then all documents have been returned to the destination. + decltype(_cloneLocs.size()) _inProgressReads = 0; + + // This condition variable allows us to wait on the following condition: + // Either we're done and the above condition is satisfied, or there is some document to + // return. + stdx::condition_variable _moreDocsCV; + decltype(_cloneLocs.size()) _numRecordsCloned{0}; + decltype(_cloneLocs.size()) _numRecordsPassedOver{0}; // The estimated average object size during the clone phase. Used for buffer size // pre-allocation (initial clone). diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index c062c8e2fe3..91e1b4a21bc 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -66,476 +66,6 @@ const ConnectionString kRecipientConnStr = HostAndPort("RecipientHost2:1234"), HostAndPort("RecipientHost3:1234")}); -class CollectionWithFault : public Collection { -public: - CollectionWithFault(const Collection* originalCollection) : _coll(originalCollection) {} - - void setFindDocStatus(Status newStatus) { - _findDocStatus = newStatus; - } - - ////////////////////////////////////////////////////////////////////////////////// - // Collection overrides - - std::shared_ptr<Collection> clone() const override { - return _coll->clone(); - } - - SharedCollectionDecorations* getSharedDecorations() const override { - return _coll->getSharedDecorations(); - } - - const NamespaceString& ns() const override { - return _coll->ns(); - } - - Status rename(OperationContext* opCtx, const NamespaceString& nss, bool stayTemp) override { - MONGO_UNREACHABLE; - } - - RecordId getCatalogId() const override { - return _coll->getCatalogId(); - } - - UUID uuid() const override { - return _coll->uuid(); - } - - const IndexCatalog* getIndexCatalog() const override { - return _coll->getIndexCatalog(); - } - - IndexCatalog* getIndexCatalog() override { - MONGO_UNREACHABLE; - } - - RecordStore* getRecordStore() const { - return _coll->getRecordStore(); - } - - std::shared_ptr<Ident> getSharedIdent() const override { - return _coll->getSharedIdent(); - } - - const BSONObj getValidatorDoc() const override { - return _coll->getValidatorDoc(); - } - - std::pair<SchemaValidationResult, Status> checkValidation( - OperationContext* opCtx, const BSONObj& document) const override { - return _coll->checkValidation(opCtx, document); - } - - bool requiresIdIndex() const override { - return _coll->requiresIdIndex(); - } - - Snapshotted<BSONObj> docFor(OperationContext* opCtx, RecordId loc) const override { - return _coll->docFor(opCtx, loc); - } - - bool findDoc(OperationContext* opCtx, RecordId loc, Snapshotted<BSONObj>* out) const override { - uassertStatusOK(_findDocStatus); - return _coll->findDoc(opCtx, loc, out); - } - - std::unique_ptr<SeekableRecordCursor> getCursor(OperationContext* opCtx, - bool forward = true) const override { - return _coll->getCursor(opCtx, forward); - } - - bool updateWithDamagesSupported() const override { - return _coll->updateWithDamagesSupported(); - } - - Status truncate(OperationContext* opCtx) override { - MONGO_UNREACHABLE; - } - - Validator parseValidator(OperationContext* opCtx, - const BSONObj& validator, - MatchExpressionParser::AllowedFeatureSet allowedFeatures, - boost::optional<multiversion::FeatureCompatibilityVersion> - maxFeatureCompatibilityVersion) const override { - return _coll->parseValidator( - opCtx, validator, allowedFeatures, maxFeatureCompatibilityVersion); - } - - void setValidator(OperationContext* opCtx, Validator validator) override { - MONGO_UNREACHABLE; - } - - Status setValidationLevel(OperationContext* opCtx, ValidationLevelEnum newLevel) override { - MONGO_UNREACHABLE; - } - - Status setValidationAction(OperationContext* opCtx, ValidationActionEnum newAction) override { - MONGO_UNREACHABLE; - } - - boost::optional<ValidationLevelEnum> getValidationLevel() const override { - return _coll->getValidationLevel(); - } - - boost::optional<ValidationActionEnum> getValidationAction() const override { - return _coll->getValidationAction(); - } - - Status updateValidator(OperationContext* opCtx, - BSONObj newValidator, - boost::optional<ValidationLevelEnum> newLevel, - boost::optional<ValidationActionEnum> newAction) override { - MONGO_UNREACHABLE; - } - - Status checkValidatorAPIVersionCompatability(OperationContext* opCtx) const override { - return _coll->checkValidatorAPIVersionCompatability(opCtx); - } - - bool isChangeStreamPreAndPostImagesEnabled() const override { - return _coll->isChangeStreamPreAndPostImagesEnabled(); - } - - void setChangeStreamPreAndPostImages(OperationContext* opCtx, - ChangeStreamPreAndPostImagesOptions val) override { - MONGO_UNREACHABLE; - } - - bool isTemporary() const override { - return _coll->isTemporary(); - } - - boost::optional<bool> getTimeseriesBucketsMayHaveMixedSchemaData() const override { - return _coll->getTimeseriesBucketsMayHaveMixedSchemaData(); - } - - void setTimeseriesBucketsMayHaveMixedSchemaData(OperationContext* opCtx, - boost::optional<bool> setting) override { - MONGO_UNREACHABLE; - } - - bool doesTimeseriesBucketsDocContainMixedSchemaData(const BSONObj& bucketsDoc) const override { - return _coll->doesTimeseriesBucketsDocContainMixedSchemaData(bucketsDoc); - } - - bool getRequiresTimeseriesExtendedRangeSupport() const override { - return _coll->getRequiresTimeseriesExtendedRangeSupport(); - } - - void setRequiresTimeseriesExtendedRangeSupport(OperationContext* opCtx) const override { - return _coll->setRequiresTimeseriesExtendedRangeSupport(opCtx); - } - - bool isClustered() const override { - return _coll->isClustered(); - } - - boost::optional<ClusteredCollectionInfo> getClusteredInfo() const override { - return _coll->getClusteredInfo(); - } - - void updateClusteredIndexTTLSetting(OperationContext* opCtx, - boost::optional<int64_t> expireAfterSeconds) override { - MONGO_UNREACHABLE; - } - - Status updateCappedSize(OperationContext* opCtx, - boost::optional<long long> newCappedSize, - boost::optional<long long> newCappedMax) override { - MONGO_UNREACHABLE; - } - - StatusWith<int> checkMetaDataForIndex(const std::string& indexName, - const BSONObj& spec) const override { - return _coll->checkMetaDataForIndex(indexName, spec); - } - - void updateTTLSetting(OperationContext* opCtx, - StringData idxName, - long long newExpireSeconds) override { - MONGO_UNREACHABLE; - } - - void updateHiddenSetting(OperationContext* opCtx, StringData idxName, bool hidden) override { - MONGO_UNREACHABLE; - } - - void updateUniqueSetting(OperationContext* opCtx, StringData idxName, bool unique) override { - MONGO_UNREACHABLE; - } - - void updatePrepareUniqueSetting(OperationContext* opCtx, - StringData idxName, - bool prepareUnique) override { - MONGO_UNREACHABLE; - } - - void setIsTemp(OperationContext* opCtx, bool isTemp) override { - MONGO_UNREACHABLE; - } - - void removeIndex(OperationContext* opCtx, StringData indexName) override { - MONGO_UNREACHABLE; - } - - Status prepareForIndexBuild(OperationContext* opCtx, - const IndexDescriptor* spec, - boost::optional<UUID> buildUUID, - bool isBackgroundSecondaryBuild) override { - MONGO_UNREACHABLE; - } - - boost::optional<UUID> getIndexBuildUUID(StringData indexName) const override { - return _coll->getIndexBuildUUID(indexName); - } - - bool isIndexMultikey(OperationContext* opCtx, - StringData indexName, - MultikeyPaths* multikeyPaths, - int indexOffset = -1) const override { - return _coll->isIndexMultikey(opCtx, indexName, multikeyPaths, indexOffset); - } - - bool setIndexIsMultikey(OperationContext* opCtx, - StringData indexName, - const MultikeyPaths& multikeyPaths, - int indexOffset = -1) const override { - return _coll->setIndexIsMultikey(opCtx, indexName, multikeyPaths, indexOffset); - } - - void forceSetIndexIsMultikey(OperationContext* opCtx, - const IndexDescriptor* desc, - bool isMultikey, - const MultikeyPaths& multikeyPaths) const override { - return _coll->forceSetIndexIsMultikey(opCtx, desc, isMultikey, multikeyPaths); - } - - int getTotalIndexCount() const override { - return _coll->getTotalIndexCount(); - } - - int getCompletedIndexCount() const override { - return _coll->getCompletedIndexCount(); - } - - BSONObj getIndexSpec(StringData indexName) const override { - return _coll->getIndexSpec(indexName); - } - - void getAllIndexes(std::vector<std::string>* names) const override { - return _coll->getAllIndexes(names); - } - - void getReadyIndexes(std::vector<std::string>* names) const override { - return _coll->getReadyIndexes(names); - } - - bool isIndexPresent(StringData indexName) const override { - return _coll->isIndexPresent(indexName); - } - - bool isIndexReady(StringData indexName) const override { - return _coll->isIndexReady(indexName); - } - - void replaceMetadata(OperationContext* opCtx, - std::shared_ptr<BSONCollectionCatalogEntry::MetaData> md) override { - MONGO_UNREACHABLE; - } - - bool isCapped() const override { - return _coll->isCapped(); - } - - long long getCappedMaxDocs() const override { - return _coll->getCappedMaxDocs(); - } - - long long getCappedMaxSize() const override { - return _coll->getCappedMaxSize(); - } - - long long numRecords(OperationContext* opCtx) const override { - return _coll->numRecords(opCtx); - } - - long long dataSize(OperationContext* opCtx) const override { - return _coll->dataSize(opCtx); - } - - bool isEmpty(OperationContext* opCtx) const override { - return _coll->isEmpty(opCtx); - } - - int averageObjectSize(OperationContext* opCtx) const override { - return _coll->averageObjectSize(opCtx); - } - - uint64_t getIndexSize(OperationContext* opCtx, - BSONObjBuilder* details = nullptr, - int scale = 1) const { - return _coll->getIndexSize(opCtx, details, scale); - } - - uint64_t getIndexFreeStorageBytes(OperationContext* opCtx) const override { - return _coll->getIndexFreeStorageBytes(opCtx); - } - - boost::optional<Timestamp> getMinimumVisibleSnapshot() const override { - return _coll->getMinimumVisibleSnapshot(); - } - - void setMinimumVisibleSnapshot(Timestamp name) override { - MONGO_UNREACHABLE; - } - - boost::optional<TimeseriesOptions> getTimeseriesOptions() const override { - return _coll->getTimeseriesOptions(); - } - - void setTimeseriesOptions(OperationContext* opCtx, - const TimeseriesOptions& tsOptions) override { - MONGO_UNREACHABLE; - } - - const CollatorInterface* getDefaultCollator() const override { - return _coll->getDefaultCollator(); - } - - const CollectionOptions& getCollectionOptions() const override { - return _coll->getCollectionOptions(); - } - - StatusWith<std::vector<BSONObj>> addCollationDefaultsToIndexSpecsForCreate( - OperationContext* opCtx, const std::vector<BSONObj>& indexSpecs) const { - return _coll->addCollationDefaultsToIndexSpecsForCreate(opCtx, indexSpecs); - } - - void indexBuildSuccess(OperationContext* opCtx, IndexCatalogEntry* index) override { - MONGO_UNREACHABLE; - } - - void onDeregisterFromCatalog(OperationContext* opCtx) override { - MONGO_UNREACHABLE; - } - - void deleteDocument(OperationContext* opCtx, - StmtId stmtId, - RecordId loc, - OpDebug* opDebug, - bool fromMigrate = false, - bool noWarn = false, - StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off, - CheckRecordId checkRecordId = CheckRecordId::Off) const override { - MONGO_UNREACHABLE; - } - - void deleteDocument(OperationContext* opCtx, - Snapshotted<BSONObj> doc, - StmtId stmtId, - RecordId loc, - OpDebug* opDebug, - bool fromMigrate = false, - bool noWarn = false, - StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off, - CheckRecordId checkRecordId = CheckRecordId::Off) const override { - MONGO_UNREACHABLE; - } - - Status insertDocuments(OperationContext* opCtx, - std::vector<InsertStatement>::const_iterator begin, - std::vector<InsertStatement>::const_iterator end, - OpDebug* opDebug, - bool fromMigrate = false) const override { - MONGO_UNREACHABLE; - } - - Status insertDocument(OperationContext* opCtx, - const InsertStatement& doc, - OpDebug* opDebug, - bool fromMigrate = false) const override { - MONGO_UNREACHABLE; - } - - Status insertDocumentsForOplog(OperationContext* opCtx, - std::vector<Record>* records, - const std::vector<Timestamp>& timestamps) const override { - MONGO_UNREACHABLE; - } - - Status insertDocumentForBulkLoader(OperationContext* opCtx, - const BSONObj& doc, - const OnRecordInsertedFn& onRecordInserted) const override { - MONGO_UNREACHABLE; - } - - RecordId updateDocument(OperationContext* opCtx, - RecordId oldLocation, - const Snapshotted<BSONObj>& oldDoc, - const BSONObj& newDoc, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const override { - MONGO_UNREACHABLE; - } - - StatusWith<RecordData> updateDocumentWithDamages(OperationContext* opCtx, - RecordId loc, - const Snapshotted<RecordData>& oldRec, - const char* damageSource, - const mutablebson::DamageVector& damages, - CollectionUpdateArgs* args) const { - MONGO_UNREACHABLE; - } - - void cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive) const override { - MONGO_UNREACHABLE; - } - - bool getRecordPreImages() const override { - return _coll->getRecordPreImages(); - } - - void setRecordPreImages(OperationContext* opCtx, bool val) override { - MONGO_UNREACHABLE; - } - - std::vector<std::string> removeInvalidIndexOptions(OperationContext* opCtx) override { - MONGO_UNREACHABLE; - } - - CappedCallback* getCappedCallback() override { - MONGO_UNREACHABLE; - } - - const CappedCallback* getCappedCallback() const override { - return _coll->getCappedCallback(); - } - - std::shared_ptr<CappedInsertNotifier> getCappedInsertNotifier() const override { - return _coll->getCappedInsertNotifier(); - } - - std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> makePlanExecutor( - OperationContext* opCtx, - const CollectionPtr& yieldableCollection, - PlanYieldPolicy::YieldPolicy yieldPolicy, - ScanDirection scanDirection, - boost::optional<RecordId> resumeAfterRecordId = boost::none) const override { - return _coll->makePlanExecutor( - opCtx, yieldableCollection, yieldPolicy, scanDirection, resumeAfterRecordId); - } - - void establishOplogCollectionForLogging(OperationContext* opCtx) const override { - return _coll->establishOplogCollectionForLogging(opCtx); - } - -private: - const Collection* _coll; - - Status _findDocStatus{Status::OK()}; -}; - class MigrationChunkClonerSourceLegacyTest : public ShardServerTestFixture { protected: MigrationChunkClonerSourceLegacyTest() : ShardServerTestFixture(Options{}.useMockClock(true)) {} @@ -597,29 +127,9 @@ protected: if (docs.empty()) return; - std::deque<BSONObj> docsToInsert; - std::copy(docs.cbegin(), docs.cend(), std::back_inserter(docsToInsert)); - - while (!docsToInsert.empty()) { - std::vector<BSONObj> batchToInsert; - - size_t sizeInBatch = 0; - while (!docsToInsert.empty()) { - auto next = docsToInsert.front(); - sizeInBatch += next.objsize(); - - if (sizeInBatch > BSONObjMaxUserSize) { - break; - } - - batchToInsert.push_back(next); - docsToInsert.pop_front(); - } - - auto response = client()->insertAcknowledged(kNss.ns(), batchToInsert); - ASSERT_OK(getStatusFromWriteCommandReply(response)); - ASSERT_GT(response["n"].Int(), 0); - } + auto response = client()->insertAcknowledged(kNss.ns(), docs); + ASSERT_OK(getStatusFromWriteCommandReply(response)); + ASSERT_GT(response["n"].Int(), 0); } void deleteDocsInShardedCollection(BSONObj query) { @@ -1191,164 +701,5 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) { cloner.cancelClone(operationContext()); } -TEST_F(MigrationChunkClonerSourceLegacyTest, CloneFetchThatOverflows) { - const auto kBigSize = 10 * 1024 * 1024; - const std::vector<BSONObj> contents = {createSizedCollectionDocument(100, kBigSize), - createSizedCollectionDocument(120, kBigSize), - createSizedCollectionDocument(199, kBigSize)}; - - createShardedCollection(contents); - - ShardsvrMoveRange req = createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); - req.setMaxChunkSizeBytes(64 * 1024 * 1024); - - MigrationChunkClonerSourceLegacy cloner(req, - WriteConcernOptions(), - kShardKeyPattern, - kDonorConnStr, - kRecipientConnStr.getServers()[0]); - - { - auto futureStartClone = launchAsync([&]() { - onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); - }); - - ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); - futureStartClone.default_timed_get(); - } - - // Ensure the initial clone documents are available - { - AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); - - { - BSONArrayBuilder arrBuilder; - ASSERT_OK( - cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); - ASSERT_EQ(1, arrBuilder.arrSize()); - - const auto arr = arrBuilder.arr(); - ASSERT_BSONOBJ_EQ(contents[0], arr[0].Obj()); - } - - { - BSONArrayBuilder arrBuilder; - ASSERT_OK( - cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); - ASSERT_EQ(1, arrBuilder.arrSize()); - - const auto arr = arrBuilder.arr(); - ASSERT_BSONOBJ_EQ(contents[1], arr[0].Obj()); - } - - { - BSONArrayBuilder arrBuilder; - ASSERT_OK( - cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); - ASSERT_EQ(1, arrBuilder.arrSize()); - - const auto arr = arrBuilder.arr(); - ASSERT_BSONOBJ_EQ(contents[2], arr[0].Obj()); - } - - { - BSONArrayBuilder arrBuilder; - ASSERT_OK( - cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); - ASSERT_EQ(0, arrBuilder.arrSize()); - } - } - - auto futureCommit = launchAsync([&]() { - onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); - }); - - ASSERT_OK(cloner.commitClone(operationContext(), true /* acquireCSOnRecipient */)); - futureCommit.default_timed_get(); -} - -TEST_F(MigrationChunkClonerSourceLegacyTest, CloneShouldNotCrashWhenNextCloneBatchThrows) { - const std::vector<BSONObj> contents = {createCollectionDocument(100), - createCollectionDocument(150), - createCollectionDocument(199)}; - - createShardedCollection(contents); - - const ShardsvrMoveRange req = - createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); - MigrationChunkClonerSourceLegacy cloner(req, - WriteConcernOptions(), - kShardKeyPattern, - kDonorConnStr, - kRecipientConnStr.getServers()[0]); - - { - auto futureStartClone = launchAsync([&]() { - onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); - }); - - ASSERT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); - futureStartClone.default_timed_get(); - } - - { - AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); - - { - auto collWithFault = - std::make_unique<CollectionWithFault>(autoColl.getCollection().get()); - CollectionPtr collPtrWithFault(collWithFault.get(), CollectionPtr::NoYieldTag()); - - // Note: findDoc currently doesn't have any interruption points, this test simulates - // an exception being thrown while it is being called. - collWithFault->setFindDocStatus({ErrorCodes::Interrupted, "fake interrupt"}); - - BSONArrayBuilder arrBuilder; - - ASSERT_THROWS_CODE( - cloner.nextCloneBatch(operationContext(), collPtrWithFault, &arrBuilder), - DBException, - ErrorCodes::Interrupted); - ASSERT_EQ(0, arrBuilder.arrSize()); - } - - // The first document was lost and returned an error during nextCloneBatch. This would - // cause the migration destination to abort, but it is still possible for other - // threads to be in the middle of calling nextCloneBatch and the next nextCloneBatch - // calls simulate calls from other threads after the first call threw. - { - BSONArrayBuilder arrBuilder; - ASSERT_OK( - cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); - - const auto arr = arrBuilder.arr(); - ASSERT_EQ(2, arrBuilder.arrSize()); - - ASSERT_BSONOBJ_EQ(contents[1], arr[0].Obj()); - ASSERT_BSONOBJ_EQ(contents[2], arr[1].Obj()); - } - - { - BSONArrayBuilder arrBuilder; - ASSERT_OK( - cloner.nextCloneBatch(operationContext(), autoColl.getCollection(), &arrBuilder)); - - const auto arr = arrBuilder.arr(); - ASSERT_EQ(0, arrBuilder.arrSize()); - } - } - - auto futureCommit = launchAsync([&]() { - // Simulate destination returning an error. - onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << false); }); - - // This is the return response for recvChunkAbort. - onCommand([&](const RemoteCommandRequest& request) { return BSON("ok" << true); }); - }); - - ASSERT_NOT_OK(cloner.commitClone(operationContext(), true /* acquireCSOnRecipient */)); - futureCommit.default_timed_get(); -} - } // namespace } // namespace mongo |