diff options
author | Randolph Tan <randolph@10gen.com> | 2019-03-25 16:44:32 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2019-04-03 16:08:25 -0400 |
commit | 47153dbf7aebeba9a5b9086a82709adfa6fd7226 (patch) | |
tree | 79fbadde42332add71593f572be0ea9cf4ec000a /src | |
parent | dd589aa07a0155cdeaa70f0403466aabcfaa5186 (diff) | |
download | mongo-47153dbf7aebeba9a5b9086a82709adfa6fd7226.tar.gz |
SERVER-40301 Don't hold mutex while doing queries in nextCloneBatch and nextModsBatch
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 121 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 39 |
2 files changed, 103 insertions, 57 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 393a99c00fa..bd599fc74b1 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -516,21 +516,24 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())); - stdx::lock_guard<stdx::mutex> sl(_mutex); - - std::set<RecordId>::iterator it; + stdx::unique_lock<stdx::mutex> lk(_mutex); + auto iter = _cloneLocs.begin(); - for (it = _cloneLocs.begin(); it != _cloneLocs.end(); ++it) { + for (; iter != _cloneLocs.end(); ++iter) { // We must always make progress in this method by at least one document because empty return // indicates there is no more initial clone data. if (arrBuilder->arrSize() && tracker.intervalHasElapsed()) { break; } + auto nextRecordId = *iter; + + lk.unlock(); + Snapshotted<BSONObj> doc; - if (collection->findDoc(opCtx, *it, &doc)) { - // Use the builder size instead of accumulating the document sizes directly so that we - // take into consideration the overhead of BSONArray indices. + if (collection->findDoc(opCtx, nextRecordId, &doc)) { + // Use the builder size instead of accumulating the document sizes directly so + // that we take into consideration the overhead of BSONArray indices. if (arrBuilder->arrSize() && (arrBuilder->len() + doc.value().objsize() + 1024) > BSONObjMaxUserSize) { break; @@ -539,9 +542,11 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, arrBuilder->append(doc.value()); ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1); } + + lk.lock(); } - _cloneLocs.erase(_cloneLocs.begin(), it); + _cloneLocs.erase(_cloneLocs.begin(), iter); return Status::OK(); } @@ -551,17 +556,33 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, BSONObjBuilder* builder) { dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS)); - stdx::lock_guard<stdx::mutex> sl(_mutex); + std::list<BSONObj> deleteList; + std::list<BSONObj> updateList; - // All clone data must have been drained before starting to fetch the incremental changes - invariant(_cloneLocs.empty()); + { + // All clone data must have been drained before starting to fetch the incremental changes. + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_cloneLocs.empty()); - long long docSizeAccumulator = 0; + // The "snapshot" for delete and update list must be taken under a single lock. This is to + // ensure that we will preserve the causal order of writes. Always consume the delete + // buffer first, before the update buffer. If the delete is causally before the update to + // the same doc, then there's no problem since we consume the delete buffer first. If the + // delete is causally after, we will not be able to see the document when we attempt to + // fetch it, so it's also ok. + deleteList.splice(deleteList.cbegin(), _deleted); + updateList.splice(updateList.cbegin(), _reload); + } + + auto totalDocSize = _xferDeletes(builder, &deleteList, 0); + totalDocSize = _xferUpdates(opCtx, db, builder, &updateList, totalDocSize); - _xfer(opCtx, db, &_deleted, builder, "deleted", &docSizeAccumulator, false); - _xfer(opCtx, db, &_reload, builder, "reload", &docSizeAccumulator, true); + builder->append("size", totalDocSize); - builder->append("size", docSizeAccumulator); + // Put back remaining ids we didn't consume + stdx::unique_lock<stdx::mutex> lk(_mutex); + _deleted.splice(_deleted.cbegin(), deleteList); + _reload.splice(_reload.cbegin(), updateList); return Status::OK(); } @@ -719,41 +740,61 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC return Status::OK(); } -void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, - Database* db, - std::list<BSONObj>* docIdList, - BSONObjBuilder* builder, - const char* fieldName, - long long* sizeAccumulator, - bool explode) { +long long MigrationChunkClonerSourceLegacy::_xferDeletes(BSONObjBuilder* builder, + std::list<BSONObj>* removeList, + long long initialSize) { const long long maxSize = 1024 * 1024; - if (docIdList->size() == 0 || *sizeAccumulator > maxSize) { - return; + if (removeList->empty() || initialSize > maxSize) { + return initialSize; } - const std::string& ns = _args.getNss().ns(); + long long totalSize = initialSize; + BSONArrayBuilder arr(builder->subarrayStart("deleted")); - BSONArrayBuilder arr(builder->subarrayStart(fieldName)); - - std::list<BSONObj>::iterator docIdIter = docIdList->begin(); - while (docIdIter != docIdList->end() && *sizeAccumulator < maxSize) { + auto docIdIter = removeList->begin(); + for (; docIdIter != removeList->end() && totalSize < maxSize; ++docIdIter) { BSONObj idDoc = *docIdIter; - if (explode) { - BSONObj fullDoc; - if (Helpers::findById(opCtx, db, ns.c_str(), idDoc, fullDoc)) { - arr.append(fullDoc); - *sizeAccumulator += fullDoc.objsize(); - } - } else { - arr.append(idDoc); - *sizeAccumulator += idDoc.objsize(); - } + arr.append(idDoc); + totalSize += idDoc.objsize(); + } + + removeList->erase(removeList->begin(), docIdIter); + + arr.done(); + return totalSize; +} + +long long MigrationChunkClonerSourceLegacy::_xferUpdates(OperationContext* opCtx, + Database* db, + BSONObjBuilder* builder, + std::list<BSONObj>* updateList, + long long initialSize) { + const long long maxSize = 1024 * 1024; - docIdIter = docIdList->erase(docIdIter); + if (updateList->empty() || initialSize > maxSize) { + return initialSize; } + const auto& nss = _args.getNss(); + BSONArrayBuilder arr(builder->subarrayStart("reload")); + long long totalSize = initialSize; + + auto iter = updateList->begin(); + for (; iter != updateList->end() && totalSize < maxSize; ++iter) { + auto idDoc = *iter; + + BSONObj fullDoc; + if (Helpers::findById(opCtx, db, nss.ns().c_str(), idDoc, fullDoc)) { + arr.append(fullDoc); + totalSize += fullDoc.objsize(); + } + } + + updateList->erase(updateList->begin(), iter); + arr.done(); + return totalSize; } boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch( diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 3874f03e76a..1fd34944a8b 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -120,7 +120,8 @@ public: /** * Called by the recipient shard. Populates the passed BSONArrayBuilder with a set of documents, - * which are part of the initial clone sequence. + * which are part of the initial clone sequence. Assumes that there is only one active caller + * to this method at a time (otherwise, it can cause corruption/crash). * * Returns OK status on success. If there were documents returned in the result argument, this * method should be called more times until the result is empty. If it returns failure, it is @@ -201,22 +202,6 @@ private: */ Status _storeCurrentLocs(OperationContext* opCtx); - /** - * Insert items from docIdList to a new array with the given fieldName in the given builder. If - * explode is true, the inserted object will be the full version of the document. Note that - * whenever an item from the docList is inserted to the array, it will also be removed from - * docList. - * - * Should be holding the collection lock for ns if explode is true. - */ - void _xfer(OperationContext* opCtx, - Database* db, - std::list<BSONObj>* docIdList, - BSONObjBuilder* builder, - const char* fieldName, - long long* sizeAccumulator, - bool explode); - /* * Consumes the operation track request and appends the relevant document changes to * the appropriate internal data structures (known colloquially as the 'transfer mods queue'). @@ -266,6 +251,26 @@ private: */ void _drainAllOutstandingOperationTrackRequests(stdx::unique_lock<stdx::mutex>& lk); + /** + * Appends to the builder the list of _id of documents that were deleted during migration. + * Entries appended to the builder are removed from the list. + * Returns the total size of the documents that were appended + initialSize. + */ + long long _xferDeletes(BSONObjBuilder* builder, + std::list<BSONObj>* removeList, + long long initialSize); + + /** + * Appends to the builder the list of full documents that were modified/inserted during the + * migration. Entries appended to the builder are removed from the list. + * Returns the total size of the documents that were appended + initialSize. + */ + long long _xferUpdates(OperationContext* opCtx, + Database* db, + BSONObjBuilder* builder, + std::list<BSONObj>* updateList, + long long initialSize); + // The original move chunk request const MoveChunkRequest _args; |