diff options
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 98 |
1 files changed, 49 insertions, 49 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index ac6b513a049..9354f60b8e1 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -90,10 +90,10 @@ BSONObj createRequestWithSessionId(StringData commandName, */ class DeleteNotificationStage final : public PlanStage { public: - DeleteNotificationStage(MigrationChunkClonerSourceLegacy* cloner, OperationContext* txn) - : PlanStage("SHARDING_NOTIFY_DELETE", txn), _cloner(cloner) {} + DeleteNotificationStage(MigrationChunkClonerSourceLegacy* cloner, OperationContext* opCtx) + : PlanStage("SHARDING_NOTIFY_DELETE", opCtx), _cloner(cloner) {} - void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) override { + void doInvalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type) override { if (type == INVALIDATION_DELETION) { stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex); _cloner->_cloneLocs.erase(dl); @@ -182,12 +182,12 @@ MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(!_deleteNotifyExec); } -Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) { +Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { invariant(_state == kNew); - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); // Load the ids of the currently available documents - auto storeCurrentLocsStatus = _storeCurrentLocs(txn); + auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx); if (!storeCurrentLocsStatus.isOK()) { return storeCurrentLocsStatus; } @@ -223,9 +223,9 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) { } Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( - OperationContext* txn, Milliseconds maxTimeToWait) { + OperationContext* opCtx, Milliseconds maxTimeToWait) { invariant(_state == kCloning); - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); const auto startTime = Date_t::now(); @@ -297,7 +297,7 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( "Aborting migration because of high memory usage"}; } - Status interruptStatus = txn->checkForInterruptNoAssert(); + Status interruptStatus = opCtx->checkForInterruptNoAssert(); if (!interruptStatus.isOK()) { return interruptStatus; } @@ -306,23 +306,23 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( return {ErrorCodes::ExceededTimeLimit, "Timed out waiting for the cloner to catch up"}; } -Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* txn) { +Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { invariant(_state == kCloning); - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); auto responseStatus = _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); if (responseStatus.isOK()) { - _cleanup(txn); + _cleanup(opCtx); return Status::OK(); } - cancelClone(txn); + cancelClone(opCtx); return responseStatus.getStatus(); } -void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); switch (_state) { case kDone: @@ -331,21 +331,21 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) { _callRecipient(createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId)); // Intentional fall through case kNew: - _cleanup(txn); + _cleanup(opCtx); break; default: MONGO_UNREACHABLE; } } -bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(OperationContext* txn, +bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(OperationContext* opCtx, const BSONObj& doc) { return isInRange(doc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern); } -void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* txn, +void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) { - dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = insertedDoc["_id"]; if (idElement.eoo()) { @@ -358,12 +358,12 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* txn, return; } - txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i')); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i')); } -void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* txn, +void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) { - dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = updatedDoc["_id"]; if (idElement.eoo()) { @@ -376,12 +376,12 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* txn, return; } - txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u')); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u')); } -void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* txn, +void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) { - dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = deletedDocId["_id"]; if (idElement.eoo()) { @@ -390,7 +390,7 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* txn, return; } - txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd')); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd')); } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -400,12 +400,12 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { _averageObjectSizeForCloneLocs * _cloneLocs.size()); } -Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn, +Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, Collection* collection, BSONArrayBuilder* arrBuilder) { - dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS)); - ElapsedTracker tracker(txn->getServiceContext()->getFastClockSource(), + ElapsedTracker tracker(opCtx->getServiceContext()->getFastClockSource(), internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())); @@ -421,7 +421,7 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn, } Snapshotted<BSONObj> doc; - if (collection->findDoc(txn, *it, &doc)) { + if (collection->findDoc(opCtx, *it, &doc)) { // Use the builder size instead of accumulating the document sizes directly so that we // take into consideration the overhead of BSONArray indices. if (arrBuilder->arrSize() && @@ -444,10 +444,10 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn, return Status::OK(); } -Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* txn, +Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder) { - dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS)); stdx::lock_guard<stdx::mutex> sl(_mutex); @@ -456,15 +456,15 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* txn, long long docSizeAccumulator = 0; - _xfer(txn, db, &_deleted, builder, "deleted", &docSizeAccumulator, false); - _xfer(txn, db, &_reload, builder, "reload", &docSizeAccumulator, true); + _xfer(opCtx, db, &_deleted, builder, "deleted", &docSizeAccumulator, false); + _xfer(opCtx, db, &_reload, builder, "reload", &docSizeAccumulator, true); builder->append("size", docSizeAccumulator); return Status::OK(); } -void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* txn) { +void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) { { stdx::lock_guard<stdx::mutex> sl(_mutex); _state = kDone; @@ -473,8 +473,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* txn) { } if (_deleteNotifyExec) { - ScopedTransaction scopedXact(txn, MODE_IS); - AutoGetCollection autoColl(txn, _args.getNss(), MODE_IS); + ScopedTransaction scopedXact(opCtx, MODE_IS); + AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); _deleteNotifyExec.reset(); } @@ -510,9 +510,9 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO return responseStatus.data.getOwned(); } -Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn) { - ScopedTransaction scopedXact(txn, MODE_IS); - AutoGetCollection autoColl(txn, _args.getNss(), MODE_IS); +Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opCtx) { + ScopedTransaction scopedXact(opCtx, MODE_IS); + AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); Collection* const collection = autoColl.getCollection(); if (!collection) { @@ -523,7 +523,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn // Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any // multi-key index prefixed by shard key cannot be multikey over the shard key fields. IndexDescriptor* const idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, + collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, _shardKeyPattern.toBSON(), false); // requireSingleKey if (!idx) { @@ -535,9 +535,9 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn // Install the stage, which will listen for notifications on the collection auto statusWithDeleteNotificationPlanExecutor = - PlanExecutor::make(txn, + PlanExecutor::make(opCtx, stdx::make_unique<WorkingSet>(), - stdx::make_unique<DeleteNotificationStage>(this, txn), + stdx::make_unique<DeleteNotificationStage>(this, opCtx), collection, PlanExecutor::YIELD_MANUAL); if (!statusWithDeleteNotificationPlanExecutor.isOK()) { @@ -554,7 +554,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn BSONObj max = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMaxKey(), false)); std::unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(txn, + InternalPlanner::indexScan(opCtx, collection, idx, min, @@ -572,9 +572,9 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn unsigned long long maxRecsWhenFull; long long avgRecSize; - const long long totalRecs = collection->numRecords(txn); + const long long totalRecs = collection->numRecords(opCtx); if (totalRecs > 0) { - avgRecSize = collection->dataSize(txn) / totalRecs; + avgRecSize = collection->dataSize(opCtx) / totalRecs; maxRecsWhenFull = _args.getMaxChunkSizeBytes() / avgRecSize; maxRecsWhenFull = std::min((unsigned long long)(kMaxObjectPerChunk + 1), 130 * maxRecsWhenFull / 100 /* slack */); @@ -610,7 +610,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn << WorkingSetCommon::toStatusString(obj)}; } - const uint64_t collectionAverageObjectSize = collection->averageObjectSize(txn); + const uint64_t collectionAverageObjectSize = collection->averageObjectSize(opCtx); if (isLargeChunk) { return { @@ -638,7 +638,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn return Status::OK(); } -void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* txn, +void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, Database* db, std::list<BSONObj>* docIdList, BSONObjBuilder* builder, @@ -660,7 +660,7 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* txn, BSONObj idDoc = *docIdIter; if (explode) { BSONObj fullDoc; - if (Helpers::findById(txn, db, ns.c_str(), idDoc, fullDoc)) { + if (Helpers::findById(opCtx, db, ns.c_str(), idDoc, fullDoc)) { arr.append(fullDoc); *sizeAccumulator += fullDoc.objsize(); } |