summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp98
1 files changed, 49 insertions, 49 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index ac6b513a049..9354f60b8e1 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -90,10 +90,10 @@ BSONObj createRequestWithSessionId(StringData commandName,
*/
class DeleteNotificationStage final : public PlanStage {
public:
- DeleteNotificationStage(MigrationChunkClonerSourceLegacy* cloner, OperationContext* txn)
- : PlanStage("SHARDING_NOTIFY_DELETE", txn), _cloner(cloner) {}
+ DeleteNotificationStage(MigrationChunkClonerSourceLegacy* cloner, OperationContext* opCtx)
+ : PlanStage("SHARDING_NOTIFY_DELETE", opCtx), _cloner(cloner) {}
- void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) override {
+ void doInvalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type) override {
if (type == INVALIDATION_DELETION) {
stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex);
_cloner->_cloneLocs.erase(dl);
@@ -182,12 +182,12 @@ MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
invariant(!_deleteNotifyExec);
}
-Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) {
+Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
invariant(_state == kNew);
- invariant(!txn->lockState()->isLocked());
+ invariant(!opCtx->lockState()->isLocked());
// Load the ids of the currently available documents
- auto storeCurrentLocsStatus = _storeCurrentLocs(txn);
+ auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx);
if (!storeCurrentLocsStatus.isOK()) {
return storeCurrentLocsStatus;
}
@@ -223,9 +223,9 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) {
}
Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
- OperationContext* txn, Milliseconds maxTimeToWait) {
+ OperationContext* opCtx, Milliseconds maxTimeToWait) {
invariant(_state == kCloning);
- invariant(!txn->lockState()->isLocked());
+ invariant(!opCtx->lockState()->isLocked());
const auto startTime = Date_t::now();
@@ -297,7 +297,7 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
"Aborting migration because of high memory usage"};
}
- Status interruptStatus = txn->checkForInterruptNoAssert();
+ Status interruptStatus = opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
return interruptStatus;
}
@@ -306,23 +306,23 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
return {ErrorCodes::ExceededTimeLimit, "Timed out waiting for the cloner to catch up"};
}
-Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* txn) {
+Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
invariant(_state == kCloning);
- invariant(!txn->lockState()->isLocked());
+ invariant(!opCtx->lockState()->isLocked());
auto responseStatus =
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
- _cleanup(txn);
+ _cleanup(opCtx);
return Status::OK();
}
- cancelClone(txn);
+ cancelClone(opCtx);
return responseStatus.getStatus();
}
-void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) {
- invariant(!txn->lockState()->isLocked());
+void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) {
+ invariant(!opCtx->lockState()->isLocked());
switch (_state) {
case kDone:
@@ -331,21 +331,21 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) {
_callRecipient(createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId));
// Intentional fall through
case kNew:
- _cleanup(txn);
+ _cleanup(opCtx);
break;
default:
MONGO_UNREACHABLE;
}
}
-bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(OperationContext* txn,
+bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(OperationContext* opCtx,
const BSONObj& doc) {
return isInRange(doc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern);
}
-void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* txn,
+void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
const BSONObj& insertedDoc) {
- dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = insertedDoc["_id"];
if (idElement.eoo()) {
@@ -358,12 +358,12 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* txn,
return;
}
- txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i'));
+ opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i'));
}
-void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* txn,
+void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
const BSONObj& updatedDoc) {
- dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = updatedDoc["_id"];
if (idElement.eoo()) {
@@ -376,12 +376,12 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* txn,
return;
}
- txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u'));
+ opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u'));
}
-void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* txn,
+void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
const BSONObj& deletedDocId) {
- dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = deletedDocId["_id"];
if (idElement.eoo()) {
@@ -390,7 +390,7 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* txn,
return;
}
- txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd'));
+ opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd'));
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -400,12 +400,12 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
_averageObjectSizeForCloneLocs * _cloneLocs.size());
}
-Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn,
+Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
Collection* collection,
BSONArrayBuilder* arrBuilder) {
- dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS));
- ElapsedTracker tracker(txn->getServiceContext()->getFastClockSource(),
+ ElapsedTracker tracker(opCtx->getServiceContext()->getFastClockSource(),
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load()));
@@ -421,7 +421,7 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn,
}
Snapshotted<BSONObj> doc;
- if (collection->findDoc(txn, *it, &doc)) {
+ if (collection->findDoc(opCtx, *it, &doc)) {
// Use the builder size instead of accumulating the document sizes directly so that we
// take into consideration the overhead of BSONArray indices.
if (arrBuilder->arrSize() &&
@@ -444,10 +444,10 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn,
return Status::OK();
}
-Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* txn,
+Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
Database* db,
BSONObjBuilder* builder) {
- dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS));
stdx::lock_guard<stdx::mutex> sl(_mutex);
@@ -456,15 +456,15 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* txn,
long long docSizeAccumulator = 0;
- _xfer(txn, db, &_deleted, builder, "deleted", &docSizeAccumulator, false);
- _xfer(txn, db, &_reload, builder, "reload", &docSizeAccumulator, true);
+ _xfer(opCtx, db, &_deleted, builder, "deleted", &docSizeAccumulator, false);
+ _xfer(opCtx, db, &_reload, builder, "reload", &docSizeAccumulator, true);
builder->append("size", docSizeAccumulator);
return Status::OK();
}
-void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* txn) {
+void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) {
{
stdx::lock_guard<stdx::mutex> sl(_mutex);
_state = kDone;
@@ -473,8 +473,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* txn) {
}
if (_deleteNotifyExec) {
- ScopedTransaction scopedXact(txn, MODE_IS);
- AutoGetCollection autoColl(txn, _args.getNss(), MODE_IS);
+ ScopedTransaction scopedXact(opCtx, MODE_IS);
+ AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
_deleteNotifyExec.reset();
}
@@ -510,9 +510,9 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO
return responseStatus.data.getOwned();
}
-Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn) {
- ScopedTransaction scopedXact(txn, MODE_IS);
- AutoGetCollection autoColl(txn, _args.getNss(), MODE_IS);
+Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opCtx) {
+ ScopedTransaction scopedXact(opCtx, MODE_IS);
+ AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS);
Collection* const collection = autoColl.getCollection();
if (!collection) {
@@ -523,7 +523,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn
// Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any
// multi-key index prefixed by shard key cannot be multikey over the shard key fields.
IndexDescriptor* const idx =
- collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn,
+ collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx,
_shardKeyPattern.toBSON(),
false); // requireSingleKey
if (!idx) {
@@ -535,9 +535,9 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn
// Install the stage, which will listen for notifications on the collection
auto statusWithDeleteNotificationPlanExecutor =
- PlanExecutor::make(txn,
+ PlanExecutor::make(opCtx,
stdx::make_unique<WorkingSet>(),
- stdx::make_unique<DeleteNotificationStage>(this, txn),
+ stdx::make_unique<DeleteNotificationStage>(this, opCtx),
collection,
PlanExecutor::YIELD_MANUAL);
if (!statusWithDeleteNotificationPlanExecutor.isOK()) {
@@ -554,7 +554,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn
BSONObj max = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMaxKey(), false));
std::unique_ptr<PlanExecutor> exec(
- InternalPlanner::indexScan(txn,
+ InternalPlanner::indexScan(opCtx,
collection,
idx,
min,
@@ -572,9 +572,9 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn
unsigned long long maxRecsWhenFull;
long long avgRecSize;
- const long long totalRecs = collection->numRecords(txn);
+ const long long totalRecs = collection->numRecords(opCtx);
if (totalRecs > 0) {
- avgRecSize = collection->dataSize(txn) / totalRecs;
+ avgRecSize = collection->dataSize(opCtx) / totalRecs;
maxRecsWhenFull = _args.getMaxChunkSizeBytes() / avgRecSize;
maxRecsWhenFull = std::min((unsigned long long)(kMaxObjectPerChunk + 1),
130 * maxRecsWhenFull / 100 /* slack */);
@@ -610,7 +610,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn
<< WorkingSetCommon::toStatusString(obj)};
}
- const uint64_t collectionAverageObjectSize = collection->averageObjectSize(txn);
+ const uint64_t collectionAverageObjectSize = collection->averageObjectSize(opCtx);
if (isLargeChunk) {
return {
@@ -638,7 +638,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn
return Status::OK();
}
-void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* txn,
+void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx,
Database* db,
std::list<BSONObj>* docIdList,
BSONObjBuilder* builder,
@@ -660,7 +660,7 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* txn,
BSONObj idDoc = *docIdIter;
if (explode) {
BSONObj fullDoc;
- if (Helpers::findById(txn, db, ns.c_str(), idDoc, fullDoc)) {
+ if (Helpers::findById(opCtx, db, ns.c_str(), idDoc, fullDoc)) {
arr.append(fullDoc);
*sizeAccumulator += fullDoc.objsize();
}