diff options
author | Maria van Keulen <maria@mongodb.com> | 2017-03-07 12:00:08 -0500 |
---|---|---|
committer | Maria van Keulen <maria@mongodb.com> | 2017-03-07 12:00:08 -0500 |
commit | 589a5c169ced8f6e9ddcd3d182ae1b75db6b7d79 (patch) | |
tree | c7a090ffdd56a91ae677e2492c61b820af44f964 /src/mongo/db/s | |
parent | 3cba97198638df3750e3b455e2ad57af7ee536ae (diff) | |
download | mongo-589a5c169ced8f6e9ddcd3d182ae1b75db6b7d79.tar.gz |
SERVER-27938 Rename all OperationContext variables to opCtx
This commit is an automated rename of all whole word instances of txn,
_txn, and txnPtr to opCtx, _opCtx, and opCtxPtr, respectively in all
.cpp and .h files in src/mongo.
Diffstat (limited to 'src/mongo/db/s')
71 files changed, 944 insertions, 928 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 53f250ef9fd..91aa5dd7bf4 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -90,7 +90,7 @@ boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkN return boost::none; } -BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContext* txn) { +BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContext* opCtx) { boost::optional<NamespaceString> nss; { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -106,9 +106,9 @@ BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContex // desireable for reporting, and then diagnosing, migrations that are stuck. if (nss) { // Lock the collection so nothing changes while we're getting the migration report. - AutoGetCollection autoColl(txn, nss.get(), MODE_IS); + AutoGetCollection autoColl(opCtx, nss.get(), MODE_IS); - auto css = CollectionShardingState::get(txn, nss.get()); + auto css = CollectionShardingState::get(opCtx, nss.get()); if (css && css->getMigrationSourceManager()) { return css->getMigrationSourceManager()->getMigrationStatusReport(); } @@ -187,9 +187,9 @@ void ScopedRegisterDonateChunk::complete(Status status) { _completionNotification->set(status); } -Status ScopedRegisterDonateChunk::waitForCompletion(OperationContext* txn) { +Status ScopedRegisterDonateChunk::waitForCompletion(OperationContext* opCtx) { invariant(!_forUnregister); - return _completionNotification->get(txn); + return _completionNotification->get(opCtx); } ScopedRegisterReceiveChunk::ScopedRegisterReceiveChunk(ActiveMigrationsRegistry* registry) diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index 971b020626d..982cb2f8b26 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -92,7 +92,7 @@ public: * * Takes an IS lock on the namespace of the active migration, if one is active. */ - BSONObj getActiveMigrationStatusReport(OperationContext* txn); + BSONObj getActiveMigrationStatusReport(OperationContext* opCtx); private: friend class ScopedRegisterDonateChunk; @@ -194,7 +194,7 @@ public: * Must only be called if the object is in the 'join' mode. Blocks until the main executor of * the moveChunk command calls complete. */ - Status waitForCompletion(OperationContext* txn); + Status waitForCompletion(OperationContext* opCtx); private: // Registry from which to unregister the migration. Not owned. diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index fd798ecd665..60a765cf5d6 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -186,12 +186,12 @@ Balancer* Balancer::get(OperationContext* operationContext) { return get(operationContext->getServiceContext()); } -void Balancer::initiateBalancer(OperationContext* txn) { +void Balancer::initiateBalancer(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> scopedLock(_mutex); invariant(_state == kStopped); _state = kRunning; - _migrationManager.startRecoveryAndAcquireDistLocks(txn); + _migrationManager.startRecoveryAndAcquireDistLocks(opCtx); invariant(!_thread.joinable()); invariant(!_threadOperationContext); @@ -240,15 +240,15 @@ void Balancer::waitForBalancerToStop() { LOG(1) << "Balancer thread terminated"; } -void Balancer::joinCurrentRound(OperationContext* txn) { +void Balancer::joinCurrentRound(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> scopedLock(_mutex); const auto numRoundsAtStart = _numBalancerRounds; _condVar.wait(scopedLock, [&] { return !_inBalancerRound || _numBalancerRounds != numRoundsAtStart; }); } -Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk) { - auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(txn, chunk); +Status Balancer::rebalanceSingleChunk(OperationContext* opCtx, const ChunkType& chunk) { + auto migrateStatus = _chunkSelectionPolicy->selectSpecificChunkToMove(opCtx, chunk); if (!migrateStatus.isOK()) { return migrateStatus.getStatus(); } @@ -259,37 +259,37 @@ Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& ch return Status::OK(); } - auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); - Status refreshStatus = balancerConfig->refreshAndCheck(txn); + auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); + Status refreshStatus = balancerConfig->refreshAndCheck(opCtx); if (!refreshStatus.isOK()) { return refreshStatus; } - return _migrationManager.executeManualMigration(txn, + return _migrationManager.executeManualMigration(opCtx, *migrateInfo, balancerConfig->getMaxChunkSizeBytes(), balancerConfig->getSecondaryThrottle(), balancerConfig->waitForDelete()); } -Status Balancer::moveSingleChunk(OperationContext* txn, +Status Balancer::moveSingleChunk(OperationContext* opCtx, const ChunkType& chunk, const ShardId& newShardId, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete) { - auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(txn, chunk, newShardId); + auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(opCtx, chunk, newShardId); if (!moveAllowedStatus.isOK()) { return moveAllowedStatus; } return _migrationManager.executeManualMigration( - txn, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete); + opCtx, MigrateInfo(newShardId, chunk), maxChunkSizeBytes, secondaryThrottle, waitForDelete); } -void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) { - auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); - balancerConfig->refreshAndCheck(txn); +void Balancer::report(OperationContext* opCtx, BSONObjBuilder* builder) { + auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); + balancerConfig->refreshAndCheck(opCtx); const auto mode = balancerConfig->getBalancerMode(); @@ -301,27 +301,27 @@ void Balancer::report(OperationContext* txn, BSONObjBuilder* builder) { void Balancer::_mainThread() { Client::initThread("Balancer"); - auto txn = cc().makeOperationContext(); - auto shardingContext = Grid::get(txn.get()); + auto opCtx = cc().makeOperationContext(); + auto shardingContext = Grid::get(opCtx.get()); log() << "CSRS balancer is starting"; { stdx::lock_guard<stdx::mutex> scopedLock(_mutex); - _threadOperationContext = txn.get(); + _threadOperationContext = opCtx.get(); } const Seconds kInitBackoffInterval(10); auto balancerConfig = shardingContext->getBalancerConfiguration(); while (!_stopRequested()) { - Status refreshStatus = balancerConfig->refreshAndCheck(txn.get()); + Status refreshStatus = balancerConfig->refreshAndCheck(opCtx.get()); if (!refreshStatus.isOK()) { warning() << "Balancer settings could not be loaded and will be retried in " << durationCount<Seconds>(kInitBackoffInterval) << " seconds" << causedBy(refreshStatus); - _sleepFor(txn.get(), kInitBackoffInterval); + _sleepFor(opCtx.get(), kInitBackoffInterval); continue; } @@ -330,8 +330,9 @@ void Balancer::_mainThread() { log() << "CSRS balancer thread is recovering"; - _migrationManager.finishRecovery( - txn.get(), balancerConfig->getMaxChunkSizeBytes(), balancerConfig->getSecondaryThrottle()); + _migrationManager.finishRecovery(opCtx.get(), + balancerConfig->getMaxChunkSizeBytes(), + balancerConfig->getSecondaryThrottle()); log() << "CSRS balancer thread is recovered"; @@ -339,23 +340,23 @@ void Balancer::_mainThread() { while (!_stopRequested()) { BalanceRoundDetails roundDetails; - _beginRound(txn.get()); + _beginRound(opCtx.get()); try { - shardingContext->shardRegistry()->reload(txn.get()); + shardingContext->shardRegistry()->reload(opCtx.get()); - uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get())); + uassert(13258, "oids broken after resetting!", _checkOIDs(opCtx.get())); - Status refreshStatus = balancerConfig->refreshAndCheck(txn.get()); + Status refreshStatus = balancerConfig->refreshAndCheck(opCtx.get()); if (!refreshStatus.isOK()) { warning() << "Skipping balancing round" << causedBy(refreshStatus); - _endRound(txn.get(), kBalanceRoundDefaultInterval); + _endRound(opCtx.get(), kBalanceRoundDefaultInterval); continue; } if (!balancerConfig->shouldBalance()) { LOG(1) << "Skipping balancing round because balancing is disabled"; - _endRound(txn.get(), kBalanceRoundDefaultInterval); + _endRound(opCtx.get(), kBalanceRoundDefaultInterval); continue; } @@ -366,9 +367,9 @@ void Balancer::_mainThread() { << balancerConfig->getSecondaryThrottle().toBSON(); OCCASIONALLY warnOnMultiVersion( - uassertStatusOK(_clusterStats->getStats(txn.get()))); + uassertStatusOK(_clusterStats->getStats(opCtx.get()))); - Status status = _enforceTagRanges(txn.get()); + Status status = _enforceTagRanges(opCtx.get()); if (!status.isOK()) { warning() << "Failed to enforce tag ranges" << causedBy(status); } else { @@ -376,25 +377,25 @@ void Balancer::_mainThread() { } const auto candidateChunks = uassertStatusOK( - _chunkSelectionPolicy->selectChunksToMove(txn.get(), _balancedLastTime)); + _chunkSelectionPolicy->selectChunksToMove(opCtx.get(), _balancedLastTime)); if (candidateChunks.empty()) { LOG(1) << "no need to move any chunk"; _balancedLastTime = false; } else { - _balancedLastTime = _moveChunks(txn.get(), candidateChunks); + _balancedLastTime = _moveChunks(opCtx.get(), candidateChunks); roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()), _balancedLastTime); - shardingContext->catalogClient(txn.get())->logAction( - txn.get(), "balancer.round", "", roundDetails.toBSON()); + shardingContext->catalogClient(opCtx.get()) + ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON()); } LOG(1) << "*** End of balancing round"; } - _endRound(txn.get(), + _endRound(opCtx.get(), _balancedLastTime ? kShortBalanceRoundInterval : kBalanceRoundDefaultInterval); } catch (const std::exception& e) { @@ -406,11 +407,11 @@ void Balancer::_mainThread() { // This round failed, tell the world! roundDetails.setFailed(e.what()); - shardingContext->catalogClient(txn.get())->logAction( - txn.get(), "balancer.round", "", roundDetails.toBSON()); + shardingContext->catalogClient(opCtx.get()) + ->logAction(opCtx.get(), "balancer.round", "", roundDetails.toBSON()); // Sleep a fair amount before retrying because of the error - _endRound(txn.get(), kBalanceRoundDefaultInterval); + _endRound(opCtx.get(), kBalanceRoundDefaultInterval); } } @@ -437,13 +438,13 @@ bool Balancer::_stopRequested() { return (_state != kRunning); } -void Balancer::_beginRound(OperationContext* txn) { +void Balancer::_beginRound(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lock(_mutex); _inBalancerRound = true; _condVar.notify_all(); } -void Balancer::_endRound(OperationContext* txn, Seconds waitTimeout) { +void Balancer::_endRound(OperationContext* opCtx, Seconds waitTimeout) { { stdx::lock_guard<stdx::mutex> lock(_mutex); _inBalancerRound = false; @@ -451,16 +452,16 @@ void Balancer::_endRound(OperationContext* txn, Seconds waitTimeout) { _condVar.notify_all(); } - _sleepFor(txn, waitTimeout); + _sleepFor(opCtx, waitTimeout); } -void Balancer::_sleepFor(OperationContext* txn, Seconds waitTimeout) { +void Balancer::_sleepFor(OperationContext* opCtx, Seconds waitTimeout) { stdx::unique_lock<stdx::mutex> lock(_mutex); _condVar.wait_for(lock, waitTimeout.toSystemDuration(), [&] { return _state != kRunning; }); } -bool Balancer::_checkOIDs(OperationContext* txn) { - auto shardingContext = Grid::get(txn); +bool Balancer::_checkOIDs(OperationContext* opCtx) { + auto shardingContext = Grid::get(opCtx); vector<ShardId> all; shardingContext->shardRegistry()->getAllShardIds(&all); @@ -473,14 +474,14 @@ bool Balancer::_checkOIDs(OperationContext* txn) { return false; } - auto shardStatus = shardingContext->shardRegistry()->getShard(txn, shardId); + auto shardStatus = shardingContext->shardRegistry()->getShard(opCtx, shardId); if (!shardStatus.isOK()) { continue; } const auto s = shardStatus.getValue(); auto result = uassertStatusOK( - s->runCommandWithFixedRetryAttempts(txn, + s->runCommandWithFixedRetryAttempts(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", BSON("features" << 1), @@ -497,18 +498,18 @@ bool Balancer::_checkOIDs(OperationContext* txn) { << " and " << oids[x]; result = uassertStatusOK(s->runCommandWithFixedRetryAttempts( - txn, + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", BSON("features" << 1 << "oidReset" << 1), Shard::RetryPolicy::kIdempotent)); uassertStatusOK(result.commandStatus); - auto otherShardStatus = shardingContext->shardRegistry()->getShard(txn, oids[x]); + auto otherShardStatus = shardingContext->shardRegistry()->getShard(opCtx, oids[x]); if (otherShardStatus.isOK()) { result = uassertStatusOK( otherShardStatus.getValue()->runCommandWithFixedRetryAttempts( - txn, + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", BSON("features" << 1 << "oidReset" << 1), @@ -526,14 +527,14 @@ bool Balancer::_checkOIDs(OperationContext* txn) { return true; } -Status Balancer::_enforceTagRanges(OperationContext* txn) { - auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(txn); +Status Balancer::_enforceTagRanges(OperationContext* opCtx) { + auto chunksToSplitStatus = _chunkSelectionPolicy->selectChunksToSplit(opCtx); if (!chunksToSplitStatus.isOK()) { return chunksToSplitStatus.getStatus(); } for (const auto& splitInfo : chunksToSplitStatus.getValue()) { - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, splitInfo.nss); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, splitInfo.nss); if (!scopedCMStatus.isOK()) { return scopedCMStatus.getStatus(); } @@ -541,7 +542,7 @@ Status Balancer::_enforceTagRanges(OperationContext* txn) { const auto& scopedCM = scopedCMStatus.getValue(); auto splitStatus = - shardutil::splitChunkAtMultiplePoints(txn, + shardutil::splitChunkAtMultiplePoints(opCtx, splitInfo.shardId, splitInfo.nss, scopedCM.cm()->getShardKeyPattern(), @@ -557,9 +558,9 @@ Status Balancer::_enforceTagRanges(OperationContext* txn) { return Status::OK(); } -int Balancer::_moveChunks(OperationContext* txn, +int Balancer::_moveChunks(OperationContext* opCtx, const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks) { - auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); + auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); // If the balancer was disabled since we started this round, don't start new chunk moves if (_stopRequested() || !balancerConfig->shouldBalance()) { @@ -568,7 +569,7 @@ int Balancer::_moveChunks(OperationContext* txn, } auto migrationStatuses = - _migrationManager.executeMigrationsForAutoBalance(txn, + _migrationManager.executeMigrationsForAutoBalance(opCtx, candidateChunks, balancerConfig->getMaxChunkSizeBytes(), balancerConfig->getSecondaryThrottle(), @@ -598,7 +599,7 @@ int Balancer::_moveChunks(OperationContext* txn, log() << "Performing a split because migration " << redact(requestIt->toString()) << " failed for size reasons" << causedBy(redact(status)); - _splitOrMarkJumbo(txn, NamespaceString(requestIt->ns), requestIt->minKey); + _splitOrMarkJumbo(opCtx, NamespaceString(requestIt->ns), requestIt->minKey); continue; } @@ -609,28 +610,28 @@ int Balancer::_moveChunks(OperationContext* txn, return numChunksProcessed; } -void Balancer::_splitOrMarkJumbo(OperationContext* txn, +void Balancer::_splitOrMarkJumbo(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) { - auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(txn, nss)); + auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss)); const auto cm = scopedCM.cm().get(); auto chunk = cm->findIntersectingChunkWithSimpleCollation(minKey); try { const auto splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( - txn, + opCtx, chunk->getShardId(), nss, cm->getShardKeyPattern(), ChunkRange(chunk->getMin(), chunk->getMax()), - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), + Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), boost::none)); uassert(ErrorCodes::CannotSplit, "No split points found", !splitPoints.empty()); uassertStatusOK( - shardutil::splitChunkAtMultiplePoints(txn, + shardutil::splitChunkAtMultiplePoints(opCtx, chunk->getShardId(), nss, cm->getShardKeyPattern(), @@ -644,8 +645,8 @@ void Balancer::_splitOrMarkJumbo(OperationContext* txn, const std::string chunkName = ChunkType::genID(nss.ns(), chunk->getMin()); - auto status = Grid::get(txn)->catalogClient(txn)->updateConfigDocument( - txn, + auto status = Grid::get(opCtx)->catalogClient(opCtx)->updateConfigDocument( + opCtx, ChunkType::ConfigNS, BSON(ChunkType::name(chunkName)), BSON("$set" << BSON(ChunkType::jumbo(true))), diff --git a/src/mongo/db/s/balancer/balancer.h b/src/mongo/db/s/balancer/balancer.h index 9171daac8cd..1537c476357 100644 --- a/src/mongo/db/s/balancer/balancer.h +++ b/src/mongo/db/s/balancer/balancer.h @@ -81,7 +81,7 @@ public: * waitForBalancerToStop has been called before). Any code in this call must not try to acquire * any locks or to wait on operations, which acquire locks. */ - void initiateBalancer(OperationContext* txn); + void initiateBalancer(OperationContext* opCtx); /** * Invoked when this node which is currently serving as a 'PRIMARY' steps down and is invoked @@ -110,7 +110,7 @@ public: * Potentially blocking method, which will return immediately if the balancer is not running a * balancer round and will block until the current round completes otherwise. */ - void joinCurrentRound(OperationContext* txn); + void joinCurrentRound(OperationContext* opCtx); /** * Blocking call, which requests the balancer to move a single chunk to a more appropriate @@ -118,7 +118,7 @@ public: * will actually move because it may already be at the best shard. An error will be returned if * the attempt to find a better shard or the actual migration fail for any reason. */ - Status rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk); + Status rebalanceSingleChunk(OperationContext* opCtx, const ChunkType& chunk); /** * Blocking call, which requests the balancer to move a single chunk to the specified location @@ -128,7 +128,7 @@ public: * NOTE: This call disregards the balancer enabled/disabled status and will proceed with the * move regardless. If should be used only for user-initiated moves. */ - Status moveSingleChunk(OperationContext* txn, + Status moveSingleChunk(OperationContext* opCtx, const ChunkType& chunk, const ShardId& newShardId, uint64_t maxChunkSizeBytes, @@ -138,7 +138,7 @@ public: /** * Appends the runtime state of the balancer instance to the specified builder. */ - void report(OperationContext* txn, BSONObjBuilder* builder); + void report(OperationContext* opCtx, BSONObjBuilder* builder); private: /** @@ -163,39 +163,39 @@ private: /** * Signals the beginning and end of a balancing round. */ - void _beginRound(OperationContext* txn); - void _endRound(OperationContext* txn, Seconds waitTimeout); + void _beginRound(OperationContext* opCtx); + void _endRound(OperationContext* opCtx, Seconds waitTimeout); /** * Blocks the caller for the specified timeout or until the balancer condition variable is * signaled, whichever comes first. */ - void _sleepFor(OperationContext* txn, Seconds waitTimeout); + void _sleepFor(OperationContext* opCtx, Seconds waitTimeout); /** * Returns true if all the servers listed in configdb as being shards are reachable and are * distinct processes (no hostname mixup). */ - bool _checkOIDs(OperationContext* txn); + bool _checkOIDs(OperationContext* opCtx); /** * Iterates through all chunks in all collections and ensures that no chunks straddle tag * boundary. If any do, they will be split. */ - Status _enforceTagRanges(OperationContext* txn); + Status _enforceTagRanges(OperationContext* opCtx); /** * Schedules migrations for the specified set of chunks and returns how many chunks were * successfully processed. */ - int _moveChunks(OperationContext* txn, + int _moveChunks(OperationContext* opCtx, const BalancerChunkSelectionPolicy::MigrateInfoVector& candidateChunks); /** * Performs a split on the chunk with min value "minKey". If the split fails, it is marked as * jumbo. */ - void _splitOrMarkJumbo(OperationContext* txn, + void _splitOrMarkJumbo(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey); diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h b/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h index e2d7f6a024e..990f5821e08 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy.h @@ -87,14 +87,14 @@ public: * they violate the policy for some reason. The reason is decided by the policy and may include * chunk is too big or chunk straddles a tag range. */ - virtual StatusWith<SplitInfoVector> selectChunksToSplit(OperationContext* txn) = 0; + virtual StatusWith<SplitInfoVector> selectChunksToSplit(OperationContext* opCtx) = 0; /** * Potentially blocking method, which gives out a set of chunks to be moved. The * aggressiveBalanceHint indicates to the balancing logic that it should lower the threshold for * difference in number of chunks across shards and thus potentially cause more chunks to move. */ - virtual StatusWith<MigrateInfoVector> selectChunksToMove(OperationContext* txn, + virtual StatusWith<MigrateInfoVector> selectChunksToMove(OperationContext* opCtx, bool aggressiveBalanceHint) = 0; /** @@ -104,14 +104,14 @@ public: * Otherwise returns migration information for where the chunk should be moved. */ virtual StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove( - OperationContext* txn, const ChunkType& chunk) = 0; + OperationContext* opCtx, const ChunkType& chunk) = 0; /** * Asks the chunk selection policy to validate that the specified chunk migration is allowed * given the current rules. Returns OK if the migration won't violate any rules or any other * failed status otherwise. */ - virtual Status checkMoveAllowed(OperationContext* txn, + virtual Status checkMoveAllowed(OperationContext* opCtx, const ChunkType& chunk, const ShardId& newShardId) = 0; diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index 4f3905b61bd..a4574dfc676 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -63,7 +63,7 @@ namespace { * distrubution and chunk placement information which is needed by the balancer policy. */ StatusWith<DistributionStatus> createCollectionDistributionStatus( - OperationContext* txn, const ShardStatisticsVector& allShards, ChunkManager* chunkMgr) { + OperationContext* opCtx, const ShardStatisticsVector& allShards, ChunkManager* chunkMgr) { ShardToChunksMap shardToChunksMap; // Makes sure there is an entry in shardToChunksMap for every shard, so empty shards will also @@ -87,8 +87,8 @@ StatusWith<DistributionStatus> createCollectionDistributionStatus( } vector<TagsType> collectionTags; - Status tagsStatus = Grid::get(txn)->catalogClient(txn)->getTagsForCollection( - txn, chunkMgr->getns(), &collectionTags); + Status tagsStatus = Grid::get(opCtx)->catalogClient(opCtx)->getTagsForCollection( + opCtx, chunkMgr->getns(), &collectionTags); if (!tagsStatus.isOK()) { return {tagsStatus.code(), str::stream() << "Unable to load tags for collection " << chunkMgr->getns() @@ -186,8 +186,8 @@ BalancerChunkSelectionPolicyImpl::BalancerChunkSelectionPolicyImpl(ClusterStatis BalancerChunkSelectionPolicyImpl::~BalancerChunkSelectionPolicyImpl() = default; StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToSplit( - OperationContext* txn) { - auto shardStatsStatus = _clusterStats->getStats(txn); + OperationContext* opCtx) { + auto shardStatsStatus = _clusterStats->getStats(opCtx); if (!shardStatsStatus.isOK()) { return shardStatsStatus.getStatus(); } @@ -196,8 +196,8 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToSpli vector<CollectionType> collections; - Status collsStatus = - Grid::get(txn)->catalogClient(txn)->getCollections(txn, nullptr, &collections, nullptr); + Status collsStatus = Grid::get(opCtx)->catalogClient(opCtx)->getCollections( + opCtx, nullptr, &collections, nullptr); if (!collsStatus.isOK()) { return collsStatus; } @@ -215,7 +215,7 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToSpli const NamespaceString nss(coll.getNs()); - auto candidatesStatus = _getSplitCandidatesForCollection(txn, nss, shardStats); + auto candidatesStatus = _getSplitCandidatesForCollection(opCtx, nss, shardStats); if (candidatesStatus == ErrorCodes::NamespaceNotFound) { // Namespace got dropped before we managed to get to it, so just skip it continue; @@ -234,8 +234,8 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToSpli } StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMove( - OperationContext* txn, bool aggressiveBalanceHint) { - auto shardStatsStatus = _clusterStats->getStats(txn); + OperationContext* opCtx, bool aggressiveBalanceHint) { + auto shardStatsStatus = _clusterStats->getStats(opCtx); if (!shardStatsStatus.isOK()) { return shardStatsStatus.getStatus(); } @@ -248,8 +248,8 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo vector<CollectionType> collections; - Status collsStatus = - Grid::get(txn)->catalogClient(txn)->getCollections(txn, nullptr, &collections, nullptr); + Status collsStatus = Grid::get(opCtx)->catalogClient(opCtx)->getCollections( + opCtx, nullptr, &collections, nullptr); if (!collsStatus.isOK()) { return collsStatus; } @@ -273,7 +273,7 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo } auto candidatesStatus = - _getMigrateCandidatesForCollection(txn, nss, shardStats, aggressiveBalanceHint); + _getMigrateCandidatesForCollection(opCtx, nss, shardStats, aggressiveBalanceHint); if (candidatesStatus == ErrorCodes::NamespaceNotFound) { // Namespace got dropped before we managed to get to it, so just skip it continue; @@ -292,9 +292,9 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo } StatusWith<boost::optional<MigrateInfo>> -BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* txn, +BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* opCtx, const ChunkType& chunk) { - auto shardStatsStatus = _clusterStats->getStats(txn); + auto shardStatsStatus = _clusterStats->getStats(opCtx); if (!shardStatsStatus.isOK()) { return shardStatsStatus.getStatus(); } @@ -303,7 +303,7 @@ BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* tx const NamespaceString nss(chunk.getNS()); - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, nss); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); if (!scopedCMStatus.isOK()) { return scopedCMStatus.getStatus(); } @@ -311,7 +311,7 @@ BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* tx const auto& scopedCM = scopedCMStatus.getValue(); const auto cm = scopedCM.cm().get(); - const auto collInfoStatus = createCollectionDistributionStatus(txn, shardStats, cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -321,10 +321,10 @@ BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* tx return BalancerPolicy::balanceSingleChunk(chunk, shardStats, distribution); } -Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* txn, +Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* opCtx, const ChunkType& chunk, const ShardId& newShardId) { - auto shardStatsStatus = _clusterStats->getStats(txn); + auto shardStatsStatus = _clusterStats->getStats(opCtx); if (!shardStatsStatus.isOK()) { return shardStatsStatus.getStatus(); } @@ -333,7 +333,7 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* txn, const NamespaceString nss(chunk.getNS()); - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, nss); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); if (!scopedCMStatus.isOK()) { return scopedCMStatus.getStatus(); } @@ -341,7 +341,7 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* txn, const auto& scopedCM = scopedCMStatus.getValue(); const auto cm = scopedCM.cm().get(); - const auto collInfoStatus = createCollectionDistributionStatus(txn, shardStats, cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -365,8 +365,8 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* txn, } StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidatesForCollection( - OperationContext* txn, const NamespaceString& nss, const ShardStatisticsVector& shardStats) { - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, nss); + OperationContext* opCtx, const NamespaceString& nss, const ShardStatisticsVector& shardStats) { + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); if (!scopedCMStatus.isOK()) { return scopedCMStatus.getStatus(); } @@ -376,7 +376,7 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidate const auto& shardKeyPattern = cm->getShardKeyPattern().getKeyPattern(); - const auto collInfoStatus = createCollectionDistributionStatus(txn, shardStats, cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } @@ -416,11 +416,11 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidate } StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection( - OperationContext* txn, + OperationContext* opCtx, const NamespaceString& nss, const ShardStatisticsVector& shardStats, bool aggressiveBalanceHint) { - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, nss); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); if (!scopedCMStatus.isOK()) { return scopedCMStatus.getStatus(); } @@ -430,7 +430,7 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandi const auto& shardKeyPattern = cm->getShardKeyPattern().getKeyPattern(); - const auto collInfoStatus = createCollectionDistributionStatus(txn, shardStats, cm); + const auto collInfoStatus = createCollectionDistributionStatus(opCtx, shardStats, cm); if (!collInfoStatus.isOK()) { return collInfoStatus.getStatus(); } diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h index f010d8c723b..6d200911836 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h @@ -39,15 +39,15 @@ public: BalancerChunkSelectionPolicyImpl(ClusterStatistics* clusterStats); ~BalancerChunkSelectionPolicyImpl(); - StatusWith<SplitInfoVector> selectChunksToSplit(OperationContext* txn) override; + StatusWith<SplitInfoVector> selectChunksToSplit(OperationContext* opCtx) override; - StatusWith<MigrateInfoVector> selectChunksToMove(OperationContext* txn, + StatusWith<MigrateInfoVector> selectChunksToMove(OperationContext* opCtx, bool aggressiveBalanceHint) override; StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove( - OperationContext* txn, const ChunkType& chunk) override; + OperationContext* opCtx, const ChunkType& chunk) override; - Status checkMoveAllowed(OperationContext* txn, + Status checkMoveAllowed(OperationContext* opCtx, const ChunkType& chunk, const ShardId& newShardId) override; @@ -57,14 +57,16 @@ private: * figure out whether some of them validate the tag range boundaries and need to be split. */ StatusWith<SplitInfoVector> _getSplitCandidatesForCollection( - OperationContext* txn, const NamespaceString& nss, const ShardStatisticsVector& shardStats); + OperationContext* opCtx, + const NamespaceString& nss, + const ShardStatisticsVector& shardStats); /** * Synchronous method, which iterates the collection's chunks and uses the cluster statistics to * figure out where to place them. */ StatusWith<MigrateInfoVector> _getMigrateCandidatesForCollection( - OperationContext* txn, + OperationContext* opCtx, const NamespaceString& nss, const ShardStatisticsVector& shardStats, bool aggressiveBalanceHint); diff --git a/src/mongo/db/s/balancer/cluster_statistics.h b/src/mongo/db/s/balancer/cluster_statistics.h index 2717c42b7ee..59435bb8dde 100644 --- a/src/mongo/db/s/balancer/cluster_statistics.h +++ b/src/mongo/db/s/balancer/cluster_statistics.h @@ -106,7 +106,7 @@ public: * Retrieves a snapshot of the current shard utilization state. The implementation of this * method may block if necessary in order to refresh its state or may return a cached value. */ - virtual StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* txn) = 0; + virtual StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* opCtx) = 0; protected: ClusterStatistics(); diff --git a/src/mongo/db/s/balancer/cluster_statistics_impl.cpp b/src/mongo/db/s/balancer/cluster_statistics_impl.cpp index 6ae4d9c223f..0547cea1124 100644 --- a/src/mongo/db/s/balancer/cluster_statistics_impl.cpp +++ b/src/mongo/db/s/balancer/cluster_statistics_impl.cpp @@ -60,16 +60,16 @@ const char kVersionField[] = "version"; * ShardNotFound if shard by that id is not available on the registry * NoSuchKey if the version could not be retrieved */ -StatusWith<string> retrieveShardMongoDVersion(OperationContext* txn, ShardId shardId) { - auto shardRegistry = Grid::get(txn)->shardRegistry(); - auto shardStatus = shardRegistry->getShard(txn, shardId); +StatusWith<string> retrieveShardMongoDVersion(OperationContext* opCtx, ShardId shardId) { + auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto shardStatus = shardRegistry->getShard(opCtx, shardId); if (!shardStatus.isOK()) { return shardStatus.getStatus(); } auto shard = shardStatus.getValue(); auto commandResponse = - shard->runCommandWithFixedRetryAttempts(txn, + shard->runCommandWithFixedRetryAttempts(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", BSON("serverStatus" << 1), @@ -100,14 +100,14 @@ ClusterStatisticsImpl::ClusterStatisticsImpl() = default; ClusterStatisticsImpl::~ClusterStatisticsImpl() = default; -StatusWith<vector<ShardStatistics>> ClusterStatisticsImpl::getStats(OperationContext* txn) { +StatusWith<vector<ShardStatistics>> ClusterStatisticsImpl::getStats(OperationContext* opCtx) { // Get a list of all the shards that are participating in this balance round along with any // maximum allowed quotas and current utilization. We get the latter by issuing // db.serverStatus() (mem.mapped) to all shards. // // TODO: skip unresponsive shards and mark information as stale. - auto shardsStatus = Grid::get(txn)->catalogClient(txn)->getAllShards( - txn, repl::ReadConcernLevel::kMajorityReadConcern); + auto shardsStatus = Grid::get(opCtx)->catalogClient(opCtx)->getAllShards( + opCtx, repl::ReadConcernLevel::kMajorityReadConcern); if (!shardsStatus.isOK()) { return shardsStatus.getStatus(); } @@ -117,7 +117,7 @@ StatusWith<vector<ShardStatistics>> ClusterStatisticsImpl::getStats(OperationCon vector<ShardStatistics> stats; for (const auto& shard : shards) { - auto shardSizeStatus = shardutil::retrieveTotalShardSize(txn, shard.getName()); + auto shardSizeStatus = shardutil::retrieveTotalShardSize(opCtx, shard.getName()); if (!shardSizeStatus.isOK()) { const Status& status = shardSizeStatus.getStatus(); @@ -130,7 +130,7 @@ StatusWith<vector<ShardStatistics>> ClusterStatisticsImpl::getStats(OperationCon string mongoDVersion; - auto mongoDVersionStatus = retrieveShardMongoDVersion(txn, shard.getName()); + auto mongoDVersionStatus = retrieveShardMongoDVersion(opCtx, shard.getName()); if (mongoDVersionStatus.isOK()) { mongoDVersion = std::move(mongoDVersionStatus.getValue()); } else { diff --git a/src/mongo/db/s/balancer/cluster_statistics_impl.h b/src/mongo/db/s/balancer/cluster_statistics_impl.h index d03a2f2b403..6d5524a5b1a 100644 --- a/src/mongo/db/s/balancer/cluster_statistics_impl.h +++ b/src/mongo/db/s/balancer/cluster_statistics_impl.h @@ -42,7 +42,7 @@ public: ClusterStatisticsImpl(); ~ClusterStatisticsImpl(); - StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* txn) override; + StatusWith<std::vector<ShardStatistics>> getStats(OperationContext* opCtx) override; }; } // namespace mongo diff --git a/src/mongo/db/s/balancer/migration_manager.cpp b/src/mongo/db/s/balancer/migration_manager.cpp index 7882201e8c2..7f267b97e67 100644 --- a/src/mongo/db/s/balancer/migration_manager.cpp +++ b/src/mongo/db/s/balancer/migration_manager.cpp @@ -111,7 +111,7 @@ MigrationManager::~MigrationManager() { } MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( - OperationContext* txn, + OperationContext* opCtx, const vector<MigrateInfo>& migrateInfos, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, @@ -127,7 +127,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( // Write a document to the config.migrations collection, in case this migration must be // recovered by the Balancer. Fail if the chunk is already moving. auto statusWithScopedMigrationRequest = - ScopedMigrationRequest::writeMigration(txn, migrateInfo, waitForDelete); + ScopedMigrationRequest::writeMigration(opCtx, migrateInfo, waitForDelete); if (!statusWithScopedMigrationRequest.isOK()) { migrationStatuses.emplace(migrateInfo.getName(), std::move(statusWithScopedMigrationRequest.getStatus())); @@ -137,7 +137,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( std::move(statusWithScopedMigrationRequest.getValue())); responses.emplace_back( - _schedule(txn, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete), + _schedule(opCtx, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete), migrateInfo); } @@ -162,7 +162,7 @@ MigrationStatuses MigrationManager::executeMigrationsForAutoBalance( } Status MigrationManager::executeManualMigration( - OperationContext* txn, + OperationContext* opCtx, const MigrateInfo& migrateInfo, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, @@ -172,15 +172,15 @@ Status MigrationManager::executeManualMigration( // Write a document to the config.migrations collection, in case this migration must be // recovered by the Balancer. Fail if the chunk is already moving. auto statusWithScopedMigrationRequest = - ScopedMigrationRequest::writeMigration(txn, migrateInfo, waitForDelete); + ScopedMigrationRequest::writeMigration(opCtx, migrateInfo, waitForDelete); if (!statusWithScopedMigrationRequest.isOK()) { return statusWithScopedMigrationRequest.getStatus(); } RemoteCommandResponse remoteCommandResponse = - _schedule(txn, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)->get(); + _schedule(opCtx, migrateInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)->get(); - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, NamespaceString(migrateInfo.ns)); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, NamespaceString(migrateInfo.ns)); if (!scopedCMStatus.isOK()) { return scopedCMStatus.getStatus(); } @@ -204,7 +204,7 @@ Status MigrationManager::executeManualMigration( return commandStatus; } -void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { +void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* opCtx) { { stdx::lock_guard<stdx::mutex> lock(_mutex); invariant(_state == State::kStopped); @@ -214,15 +214,15 @@ void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { auto scopedGuard = MakeGuard([&] { _migrationRecoveryMap.clear(); - _abandonActiveMigrationsAndEnableManager(txn); + _abandonActiveMigrationsAndEnableManager(opCtx); }); - auto distLockManager = Grid::get(txn)->catalogClient(txn)->getDistLockManager(); + auto distLockManager = Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager(); // Load the active migrations from the config.migrations collection. auto statusWithMigrationsQueryResponse = - Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, NamespaceString(MigrationType::ConfigNS), @@ -260,7 +260,7 @@ void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { << migrateType.getNss().ns()); auto statusWithDistLockHandle = distLockManager->tryLockWithLocalWriteConcern( - txn, migrateType.getNss().ns(), whyMessage, _lockSessionID); + opCtx, migrateType.getNss().ns(), whyMessage, _lockSessionID); if (!statusWithDistLockHandle.isOK()) { log() << "Failed to acquire distributed lock for collection '" << migrateType.getNss().ns() @@ -277,7 +277,7 @@ void MigrationManager::startRecoveryAndAcquireDistLocks(OperationContext* txn) { scopedGuard.Dismiss(); } -void MigrationManager::finishRecovery(OperationContext* txn, +void MigrationManager::finishRecovery(OperationContext* opCtx, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle) { { @@ -298,7 +298,7 @@ void MigrationManager::finishRecovery(OperationContext* txn, auto scopedGuard = MakeGuard([&] { _migrationRecoveryMap.clear(); - _abandonActiveMigrationsAndEnableManager(txn); + _abandonActiveMigrationsAndEnableManager(opCtx); }); // Schedule recovered migrations. @@ -310,7 +310,7 @@ void MigrationManager::finishRecovery(OperationContext* txn, auto& migrateInfos = nssAndMigrateInfos.second; invariant(!migrateInfos.empty()); - auto scopedCMStatus = ScopedChunkManager::refreshAndGet(txn, nss); + auto scopedCMStatus = ScopedChunkManager::refreshAndGet(opCtx, nss); if (!scopedCMStatus.isOK()) { // This shouldn't happen because the collection was intact and sharded when the previous // config primary was active and the dist locks have been held by the balancer @@ -338,23 +338,23 @@ void MigrationManager::finishRecovery(OperationContext* txn, if (chunk->getShardId() != migrationInfo.from) { // Chunk is no longer on the source shard specified by this migration. Erase the // migration recovery document associated with it. - ScopedMigrationRequest::createForRecovery(txn, nss, migrationInfo.minKey); + ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey); continue; } scopedMigrationRequests.emplace_back( - ScopedMigrationRequest::createForRecovery(txn, nss, migrationInfo.minKey)); + ScopedMigrationRequest::createForRecovery(opCtx, nss, migrationInfo.minKey)); scheduledMigrations++; - responses.emplace_back( - _schedule(txn, migrationInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)); + responses.emplace_back(_schedule( + opCtx, migrationInfo, maxChunkSizeBytes, secondaryThrottle, waitForDelete)); } // If no migrations were scheduled for this namespace, free the dist lock if (!scheduledMigrations) { - Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock( - txn, _lockSessionID, nss.ns()); + Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->unlock( + opCtx, _lockSessionID, nss.ns()); } } @@ -408,7 +408,7 @@ void MigrationManager::drainActiveMigrations() { } shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( - OperationContext* txn, + OperationContext* opCtx, const MigrateInfo& migrateInfo, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, @@ -425,15 +425,16 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( } } - const auto fromShardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from); + const auto fromShardStatus = + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, migrateInfo.from); if (!fromShardStatus.isOK()) { return std::make_shared<Notification<RemoteCommandResponse>>( std::move(fromShardStatus.getStatus())); } const auto fromShard = fromShardStatus.getValue(); - auto fromHostStatus = - fromShard->getTargeter()->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); + auto fromHostStatus = fromShard->getTargeter()->findHost( + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); if (!fromHostStatus.isOK()) { return std::make_shared<Notification<RemoteCommandResponse>>( std::move(fromHostStatus.getStatus())); @@ -444,7 +445,7 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( &builder, nss, migrateInfo.version, - repl::ReplicationCoordinator::get(txn)->getConfig().getConnectionString(), + repl::ReplicationCoordinator::get(opCtx)->getConfig().getConnectionString(), migrateInfo.from, migrateInfo.to, ChunkRange(migrateInfo.minKey, migrateInfo.maxKey), @@ -464,15 +465,16 @@ shared_ptr<Notification<RemoteCommandResponse>> MigrationManager::_schedule( auto retVal = migration.completionNotification; - _schedule_inlock(txn, fromHostStatus.getValue(), std::move(migration)); + _schedule_inlock(opCtx, fromHostStatus.getValue(), std::move(migration)); return retVal; } -void MigrationManager::_schedule_inlock(OperationContext* txn, +void MigrationManager::_schedule_inlock(OperationContext* opCtx, const HostAndPort& targetHost, Migration migration) { - executor::TaskExecutor* const executor = Grid::get(txn)->getExecutorPool()->getFixedExecutor(); + executor::TaskExecutor* const executor = + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); const NamespaceString nss(migration.nss); @@ -482,8 +484,8 @@ void MigrationManager::_schedule_inlock(OperationContext* txn, // Acquire the collection distributed lock (blocking call) auto statusWithDistLockHandle = - Grid::get(txn)->catalogClient(txn)->getDistLockManager()->lockWithSessionID( - txn, + Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->lockWithSessionID( + opCtx, nss.ns(), whyMessage, _lockSessionID, @@ -508,7 +510,7 @@ void MigrationManager::_schedule_inlock(OperationContext* txn, auto itMigration = migrations->begin(); const RemoteCommandRequest remoteRequest( - targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, txn); + targetHost, NamespaceString::kAdminDb.toString(), itMigration->moveChunkCmdObj, opCtx); StatusWith<executor::TaskExecutor::CallbackHandle> callbackHandleWithStatus = executor->scheduleRemoteCommand( @@ -516,10 +518,10 @@ void MigrationManager::_schedule_inlock(OperationContext* txn, [this, itMigration](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { Client::initThread(getThreadName().c_str()); ON_BLOCK_EXIT([&] { Client::destroy(); }); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); stdx::lock_guard<stdx::mutex> lock(_mutex); - _complete_inlock(txn.get(), itMigration, args.response); + _complete_inlock(opCtx.get(), itMigration, args.response); }); if (callbackHandleWithStatus.isOK()) { @@ -527,10 +529,10 @@ void MigrationManager::_schedule_inlock(OperationContext* txn, return; } - _complete_inlock(txn, itMigration, std::move(callbackHandleWithStatus.getStatus())); + _complete_inlock(opCtx, itMigration, std::move(callbackHandleWithStatus.getStatus())); } -void MigrationManager::_complete_inlock(OperationContext* txn, +void MigrationManager::_complete_inlock(OperationContext* opCtx, MigrationsList::iterator itMigration, const RemoteCommandResponse& remoteCommandResponse) { const NamespaceString nss(itMigration->nss); @@ -547,8 +549,8 @@ void MigrationManager::_complete_inlock(OperationContext* txn, migrations->erase(itMigration); if (migrations->empty()) { - Grid::get(txn)->catalogClient(txn)->getDistLockManager()->unlock( - txn, _lockSessionID, nss.ns()); + Grid::get(opCtx)->catalogClient(opCtx)->getDistLockManager()->unlock( + opCtx, _lockSessionID, nss.ns()); _activeMigrations.erase(it); _checkDrained_inlock(); } @@ -572,7 +574,7 @@ void MigrationManager::_waitForRecovery() { _condVar.wait(lock, [this] { return _state != State::kRecovering; }); } -void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* txn) { +void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lock(_mutex); if (_state == State::kStopping) { // The balancer was interrupted. Let the next balancer recover the state. @@ -580,16 +582,16 @@ void MigrationManager::_abandonActiveMigrationsAndEnableManager(OperationContext } invariant(_state == State::kRecovering); - auto catalogClient = Grid::get(txn)->catalogClient(txn); + auto catalogClient = Grid::get(opCtx)->catalogClient(opCtx); // Unlock all balancer distlocks we aren't using anymore. auto distLockManager = catalogClient->getDistLockManager(); - distLockManager->unlockAll(txn, distLockManager->getProcessID()); + distLockManager->unlockAll(opCtx, distLockManager->getProcessID()); // Clear the config.migrations collection so that those chunks can be scheduled for migration // again. catalogClient->removeConfigDocuments( - txn, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern); + opCtx, MigrationType::ConfigNS, BSONObj(), kMajorityWriteConcern); _state = State::kEnabled; _condVar.notify_all(); diff --git a/src/mongo/db/s/balancer/migration_manager.h b/src/mongo/db/s/balancer/migration_manager.h index 011397c412a..26da3f057f7 100644 --- a/src/mongo/db/s/balancer/migration_manager.h +++ b/src/mongo/db/s/balancer/migration_manager.h @@ -79,7 +79,7 @@ public: * Returns a map of migration Status objects to indicate the success/failure of each migration. */ MigrationStatuses executeMigrationsForAutoBalance( - OperationContext* txn, + OperationContext* opCtx, const std::vector<MigrateInfo>& migrateInfos, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, @@ -92,7 +92,7 @@ public: * * Returns the status of the migration. */ - Status executeManualMigration(OperationContext* txn, + Status executeManualMigration(OperationContext* opCtx, const MigrateInfo& migrateInfo, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, @@ -106,7 +106,7 @@ public: * * The active migration recovery may fail and be abandoned, setting the state to kEnabled. */ - void startRecoveryAndAcquireDistLocks(OperationContext* txn); + void startRecoveryAndAcquireDistLocks(OperationContext* opCtx); /** * Blocking method that must only be called after startRecovery has been called. Recovers the @@ -118,7 +118,7 @@ public: * The active migration recovery may fail and be abandoned, setting the state to kEnabled and * unblocking any process waiting on the recovery state. */ - void finishRecovery(OperationContext* txn, + void finishRecovery(OperationContext* opCtx, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle); @@ -181,7 +181,7 @@ private: * can be used to obtain the outcome of the operation. */ std::shared_ptr<Notification<executor::RemoteCommandResponse>> _schedule( - OperationContext* txn, + OperationContext* opCtx, const MigrateInfo& migrateInfo, uint64_t maxChunkSizeBytes, const MigrationSecondaryThrottleOptions& secondaryThrottle, @@ -194,7 +194,7 @@ private: * The distributed lock is acquired before scheduling the first migration for the collection and * is only released when all active migrations on the collection have finished. */ - void _schedule_inlock(OperationContext* txn, + void _schedule_inlock(OperationContext* opCtx, const HostAndPort& targetHost, Migration migration); @@ -204,7 +204,7 @@ private: * passed iterator and if this is the last migration for the collection will free the collection * distributed lock. */ - void _complete_inlock(OperationContext* txn, + void _complete_inlock(OperationContext* opCtx, MigrationsList::iterator itMigration, const executor::RemoteCommandResponse& remoteCommandResponse); @@ -226,7 +226,7 @@ private: * that the balancer holds, clears the config.migrations collection, changes the state of the * migration manager to kEnabled. Then unblocks all processes waiting for kEnabled state. */ - void _abandonActiveMigrationsAndEnableManager(OperationContext* txn); + void _abandonActiveMigrationsAndEnableManager(OperationContext* opCtx); /** * Parses a moveChunk RemoteCommandResponse's two levels of Status objects and distiguishes diff --git a/src/mongo/db/s/balancer/migration_manager_test.cpp b/src/mongo/db/s/balancer/migration_manager_test.cpp index b763cafd25b..dd2ee3d84d6 100644 --- a/src/mongo/db/s/balancer/migration_manager_test.cpp +++ b/src/mongo/db/s/balancer/migration_manager_test.cpp @@ -78,7 +78,7 @@ protected: /** * Returns the mock targeter for the specified shard. Useful to use like so * - * shardTargeterMock(txn, shardId)->setFindHostReturnValue(shardHost); + * shardTargeterMock(opCtx, shardId)->setFindHostReturnValue(shardHost); * * Then calls to RemoteCommandTargeterMock::findHost will return HostAndPort "shardHost" for * Shard "shardId". @@ -86,7 +86,7 @@ protected: * Scheduling a command requires a shard host target. The command will be caught by the mock * network, but sending the command requires finding the shard's host. */ - std::shared_ptr<RemoteCommandTargeterMock> shardTargeterMock(OperationContext* txn, + std::shared_ptr<RemoteCommandTargeterMock> shardTargeterMock(OperationContext* opCtx, ShardId shardId); /** @@ -174,9 +174,9 @@ void MigrationManagerTest::tearDown() { } std::shared_ptr<RemoteCommandTargeterMock> MigrationManagerTest::shardTargeterMock( - OperationContext* txn, ShardId shardId) { + OperationContext* opCtx, ShardId shardId) { return RemoteCommandTargeterMock::get( - uassertStatusOK(shardRegistry()->getShard(txn, shardId))->getTargeter()); + uassertStatusOK(shardRegistry()->getShard(opCtx, shardId))->getTargeter()); } void MigrationManagerTest::setUpDatabase(const std::string& dbName, const ShardId primaryShard) { @@ -315,15 +315,15 @@ TEST_F(MigrationManagerTest, OneCollectionTwoMigrations) { auto future = launchAsync([this, migrationRequests] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); // Scheduling the moveChunk commands requires finding a host to which to send the command. // Set up dummy hosts for the source shards. - shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); + shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2); MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( - txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); for (const auto& migrateInfo : migrationRequests) { ASSERT_OK(migrationStatuses.at(migrateInfo.getName())); @@ -378,15 +378,15 @@ TEST_F(MigrationManagerTest, TwoCollectionsTwoMigrationsEach) { auto future = launchAsync([this, migrationRequests] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); // Scheduling the moveChunk commands requires finding a host to which to send the command. // Set up dummy hosts for the source shards. - shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); + shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2); MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( - txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); for (const auto& migrateInfo : migrationRequests) { ASSERT_OK(migrationStatuses.at(migrateInfo.getName())); @@ -433,17 +433,17 @@ TEST_F(MigrationManagerTest, SourceShardNotFound) { auto future = launchAsync([this, chunk1, chunk2, migrationRequests] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); // Scheduling a moveChunk command requires finding a host to which to send the command. Set // up a dummy host for kShardHost0, and return an error for kShardHost3. - shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - shardTargeterMock(txn.get(), kShardId2) + shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(opCtx.get(), kShardId2) ->setFindHostReturnValue( Status(ErrorCodes::ReplicaSetNotFound, "SourceShardNotFound generated error.")); MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( - txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); ASSERT_OK(migrationStatuses.at(chunk1.getName())); ASSERT_EQ(ErrorCodes::ReplicaSetNotFound, migrationStatuses.at(chunk2.getName())); @@ -480,14 +480,14 @@ TEST_F(MigrationManagerTest, JumboChunkResponseBackwardsCompatibility) { auto future = launchAsync([this, chunk1, migrationRequests] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); // Scheduling a moveChunk command requires finding a host to which to send the command. Set // up a dummy host for kShardHost0. - shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( - txn.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); + opCtx.get(), migrationRequests, 0, kDefaultSecondaryThrottle, false); ASSERT_EQ(ErrorCodes::ChunkTooBig, migrationStatuses.at(chunk1.getName())); }); @@ -519,15 +519,15 @@ TEST_F(MigrationManagerTest, InterruptMigration) { auto future = launchAsync([&] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); // Scheduling a moveChunk command requires finding a host to which to send the command. Set // up a dummy host for kShardHost0. - shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); ASSERT_EQ(ErrorCodes::BalancerInterrupted, _migrationManager->executeManualMigration( - txn.get(), {kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false)); + opCtx.get(), {kShardId1, chunk}, 0, kDefaultSecondaryThrottle, false)); }); // Wait till the move chunk request gets sent and pretend that it is stuck by never responding @@ -608,14 +608,14 @@ TEST_F(MigrationManagerTest, RestartMigrationManager) { auto future = launchAsync([&] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); // Scheduling a moveChunk command requires finding a host to which to send the command. Set // up a dummy host for kShardHost0. - shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); ASSERT_OK(_migrationManager->executeManualMigration( - txn.get(), {kShardId1, chunk1}, 0, kDefaultSecondaryThrottle, false)); + opCtx.get(), {kShardId1, chunk1}, 0, kDefaultSecondaryThrottle, false)); }); // Expect only one moveChunk command to be called. @@ -663,14 +663,14 @@ TEST_F(MigrationManagerTest, MigrationRecovery) { auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); // Scheduling the moveChunk commands requires finding hosts to which to send the commands. // Set up dummy hosts for the source shards. - shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); + shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2); - _migrationManager->finishRecovery(txn.get(), 0, kDefaultSecondaryThrottle); + _migrationManager->finishRecovery(opCtx.get(), 0, kDefaultSecondaryThrottle); }); // Expect two moveChunk commands. @@ -765,15 +765,15 @@ TEST_F(MigrationManagerTest, RemoteCallErrorConversionToOperationFailed) { auto future = launchAsync([&] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); // Scheduling the moveChunk commands requires finding a host to which to send the command. // Set up dummy hosts for the source shards. - shardTargeterMock(txn.get(), kShardId0)->setFindHostReturnValue(kShardHost0); - shardTargeterMock(txn.get(), kShardId2)->setFindHostReturnValue(kShardHost2); + shardTargeterMock(opCtx.get(), kShardId0)->setFindHostReturnValue(kShardHost0); + shardTargeterMock(opCtx.get(), kShardId2)->setFindHostReturnValue(kShardHost2); MigrationStatuses migrationStatuses = _migrationManager->executeMigrationsForAutoBalance( - txn.get(), + opCtx.get(), {{kShardId1, chunk1}, {kShardId3, chunk2}}, 0, kDefaultSecondaryThrottle, diff --git a/src/mongo/db/s/balancer/scoped_migration_request.cpp b/src/mongo/db/s/balancer/scoped_migration_request.cpp index af737fd0640..bbbcb0174f0 100644 --- a/src/mongo/db/s/balancer/scoped_migration_request.cpp +++ b/src/mongo/db/s/balancer/scoped_migration_request.cpp @@ -49,14 +49,14 @@ const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, const int kDuplicateKeyErrorMaxRetries = 2; } -ScopedMigrationRequest::ScopedMigrationRequest(OperationContext* txn, +ScopedMigrationRequest::ScopedMigrationRequest(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) - : _txn(txn), _nss(nss), _minKey(minKey) {} + : _opCtx(opCtx), _nss(nss), _minKey(minKey) {} ScopedMigrationRequest::~ScopedMigrationRequest() { - if (!_txn) { - // If the txn object was cleared, nothing should happen in the destructor. + if (!_opCtx) { + // If the opCtx object was cleared, nothing should happen in the destructor. return; } @@ -64,8 +64,8 @@ ScopedMigrationRequest::~ScopedMigrationRequest() { // okay. BSONObj migrationDocumentIdentifier = BSON(MigrationType::ns(_nss.ns()) << MigrationType::min(_minKey)); - Status result = grid.catalogClient(_txn)->removeConfigDocuments( - _txn, MigrationType::ConfigNS, migrationDocumentIdentifier, kMajorityWriteConcern); + Status result = grid.catalogClient(_opCtx)->removeConfigDocuments( + _opCtx, MigrationType::ConfigNS, migrationDocumentIdentifier, kMajorityWriteConcern); if (!result.isOK()) { LOG(0) << "Failed to remove config.migrations document for migration '" @@ -75,31 +75,31 @@ ScopedMigrationRequest::~ScopedMigrationRequest() { ScopedMigrationRequest::ScopedMigrationRequest(ScopedMigrationRequest&& other) { *this = std::move(other); - // Set txn to null so that the destructor will do nothing. - other._txn = nullptr; + // Set opCtx to null so that the destructor will do nothing. + other._opCtx = nullptr; } ScopedMigrationRequest& ScopedMigrationRequest::operator=(ScopedMigrationRequest&& other) { if (this != &other) { - _txn = other._txn; + _opCtx = other._opCtx; _nss = other._nss; _minKey = other._minKey; - // Set txn to null so that the destructor will do nothing. - other._txn = nullptr; + // Set opCtx to null so that the destructor will do nothing. + other._opCtx = nullptr; } return *this; } StatusWith<ScopedMigrationRequest> ScopedMigrationRequest::writeMigration( - OperationContext* txn, const MigrateInfo& migrateInfo, bool waitForDelete) { + OperationContext* opCtx, const MigrateInfo& migrateInfo, bool waitForDelete) { // Try to write a unique migration document to config.migrations. const MigrationType migrationType(migrateInfo, waitForDelete); for (int retry = 0; retry < kDuplicateKeyErrorMaxRetries; ++retry) { - Status result = grid.catalogClient(txn)->insertConfigDocument( - txn, MigrationType::ConfigNS, migrationType.toBSON(), kMajorityWriteConcern); + Status result = grid.catalogClient(opCtx)->insertConfigDocument( + opCtx, MigrationType::ConfigNS, migrationType.toBSON(), kMajorityWriteConcern); if (result == ErrorCodes::DuplicateKey) { // If the exact migration described by "migrateInfo" is active, return a scoped object @@ -107,7 +107,7 @@ StatusWith<ScopedMigrationRequest> ScopedMigrationRequest::writeMigration( // scheduled. auto statusWithMigrationQueryResult = grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, NamespaceString(MigrationType::ConfigNS), @@ -160,7 +160,7 @@ StatusWith<ScopedMigrationRequest> ScopedMigrationRequest::writeMigration( // safe (won't delete another migration's document) and necessary to try to clean up the // document via the destructor. ScopedMigrationRequest scopedMigrationRequest( - txn, NamespaceString(migrateInfo.ns), migrateInfo.minKey); + opCtx, NamespaceString(migrateInfo.ns), migrateInfo.minKey); // If there was a write error, let the object go out of scope and clean up in the // destructor. @@ -180,28 +180,28 @@ StatusWith<ScopedMigrationRequest> ScopedMigrationRequest::writeMigration( << "' was being moved (somewhere) by another operation."); } -ScopedMigrationRequest ScopedMigrationRequest::createForRecovery(OperationContext* txn, +ScopedMigrationRequest ScopedMigrationRequest::createForRecovery(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey) { - return ScopedMigrationRequest(txn, nss, minKey); + return ScopedMigrationRequest(opCtx, nss, minKey); } Status ScopedMigrationRequest::tryToRemoveMigration() { - invariant(_txn); + invariant(_opCtx); BSONObj migrationDocumentIdentifier = BSON(MigrationType::ns(_nss.ns()) << MigrationType::min(_minKey)); - Status status = grid.catalogClient(_txn)->removeConfigDocuments( - _txn, MigrationType::ConfigNS, migrationDocumentIdentifier, kMajorityWriteConcern); + Status status = grid.catalogClient(_opCtx)->removeConfigDocuments( + _opCtx, MigrationType::ConfigNS, migrationDocumentIdentifier, kMajorityWriteConcern); if (status.isOK()) { // Don't try to do a no-op remove in the destructor. - _txn = nullptr; + _opCtx = nullptr; } return status; } void ScopedMigrationRequest::keepDocumentOnDestruct() { - invariant(_txn); - _txn = nullptr; + invariant(_opCtx); + _opCtx = nullptr; LOG(1) << "Keeping config.migrations document with namespace '" << _nss << "' and minKey '" << _minKey << "' for balancer recovery"; } diff --git a/src/mongo/db/s/balancer/scoped_migration_request.h b/src/mongo/db/s/balancer/scoped_migration_request.h index e3b4e3301da..b3f100d92d6 100644 --- a/src/mongo/db/s/balancer/scoped_migration_request.h +++ b/src/mongo/db/s/balancer/scoped_migration_request.h @@ -66,7 +66,7 @@ public: * * The destructor will handle removing the document when it is no longer needed. */ - static StatusWith<ScopedMigrationRequest> writeMigration(OperationContext* txn, + static StatusWith<ScopedMigrationRequest> writeMigration(OperationContext* opCtx, const MigrateInfo& migrate, bool waitForDelete); @@ -77,7 +77,7 @@ public: * This should only be used on Balancer recovery when a config.migrations document already * exists for the migration. */ - static ScopedMigrationRequest createForRecovery(OperationContext* txn, + static ScopedMigrationRequest createForRecovery(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey); @@ -102,12 +102,12 @@ public: void keepDocumentOnDestruct(); private: - ScopedMigrationRequest(OperationContext* txn, + ScopedMigrationRequest(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey); // Need an operation context with which to do a write in the destructor. - OperationContext* _txn; + OperationContext* _opCtx; // ns and minkey are needed to identify the migration document when it is removed from // config.migrations by the destructor. diff --git a/src/mongo/db/s/check_sharding_index_command.cpp b/src/mongo/db/s/check_sharding_index_command.cpp index 0d7ef33de31..79f4b50cbc9 100644 --- a/src/mongo/db/s/check_sharding_index_command.cpp +++ b/src/mongo/db/s/check_sharding_index_command.cpp @@ -82,7 +82,7 @@ public: return parseNsFullyQualified(dbname, cmdObj); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& dbname, BSONObj& jsobj, int options, @@ -108,7 +108,7 @@ public: return false; } - AutoGetCollection autoColl(txn, nss, MODE_IS); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); Collection* const collection = autoColl.getCollection(); if (!collection) { @@ -117,7 +117,7 @@ public: } IndexDescriptor* idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, + collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPattern, true); // requireSingleKey if (idx == NULL) { @@ -136,7 +136,7 @@ public: } unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(txn, + InternalPlanner::indexScan(opCtx, collection, idx, min, @@ -150,7 +150,7 @@ public: // this index. // NOTE A local copy of 'missingField' is made because indices may be // invalidated during a db lock yield. - BSONObj missingFieldObj = IndexLegacy::getMissingField(txn, collection, idx->infoObj()); + BSONObj missingFieldObj = IndexLegacy::getMissingField(opCtx, collection, idx->infoObj()); BSONElement missingField = missingFieldObj.firstElement(); // for now, the only check is that all shard keys are filled @@ -180,7 +180,7 @@ public: // This is a fetch, but it's OK. The underlying code won't throw a page fault // exception. - BSONObj obj = collection->docFor(txn, loc).value(); + BSONObj obj = collection->docFor(opCtx, loc).value(); BSONObjIterator j(keyPattern); BSONElement real; for (int x = 0; x <= k; x++) diff --git a/src/mongo/db/s/chunk_move_write_concern_options.cpp b/src/mongo/db/s/chunk_move_write_concern_options.cpp index 32ecdaf56dc..700f134a604 100644 --- a/src/mongo/db/s/chunk_move_write_concern_options.cpp +++ b/src/mongo/db/s/chunk_move_write_concern_options.cpp @@ -66,10 +66,10 @@ WriteConcernOptions getDefaultWriteConcernForMigration() { } // namespace StatusWith<WriteConcernOptions> ChunkMoveWriteConcernOptions::getEffectiveWriteConcern( - OperationContext* txn, const MigrationSecondaryThrottleOptions& options) { + OperationContext* opCtx, const MigrationSecondaryThrottleOptions& options) { auto secondaryThrottle = options.getSecondaryThrottle(); if (secondaryThrottle == MigrationSecondaryThrottleOptions::kDefault) { - if (txn->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()) { + if (opCtx->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()) { secondaryThrottle = MigrationSecondaryThrottleOptions::kOff; } else { secondaryThrottle = MigrationSecondaryThrottleOptions::kOn; diff --git a/src/mongo/db/s/chunk_move_write_concern_options.h b/src/mongo/db/s/chunk_move_write_concern_options.h index b9734120b2a..e3b380a8634 100644 --- a/src/mongo/db/s/chunk_move_write_concern_options.h +++ b/src/mongo/db/s/chunk_move_write_concern_options.h @@ -60,7 +60,7 @@ public: * concern. */ static StatusWith<WriteConcernOptions> getEffectiveWriteConcern( - OperationContext* txn, const MigrationSecondaryThrottleOptions& options); + OperationContext* opCtx, const MigrationSecondaryThrottleOptions& options); }; } // namespace mongo diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp index 038ecbfbdeb..d6690f872ef 100644 --- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp @@ -72,7 +72,7 @@ enum CleanupResult { CleanupResult_Done, CleanupResult_Continue, CleanupResult_E * * If the collection is not sharded, returns CleanupResult_Done. */ -CleanupResult cleanupOrphanedData(OperationContext* txn, +CleanupResult cleanupOrphanedData(OperationContext* opCtx, const NamespaceString& ns, const BSONObj& startingFromKeyConst, const WriteConcernOptions& secondaryThrottle, @@ -82,8 +82,8 @@ CleanupResult cleanupOrphanedData(OperationContext* txn, ScopedCollectionMetadata metadata; { - AutoGetCollection autoColl(txn, ns, MODE_IS); - metadata = CollectionShardingState::get(txn, ns.toString())->getMetadata(); + AutoGetCollection autoColl(opCtx, ns, MODE_IS); + metadata = CollectionShardingState::get(opCtx, ns.toString())->getMetadata(); } if (!metadata || metadata->getKeyPattern().isEmpty()) { @@ -132,7 +132,7 @@ CleanupResult cleanupOrphanedData(OperationContext* txn, deleterOptions.waitForOpenCursors = true; deleterOptions.removeSaverReason = "cleanup-cmd"; - if (!getDeleter()->deleteNow(txn, deleterOptions, errMsg)) { + if (!getDeleter()->deleteNow(opCtx, deleterOptions, errMsg)) { warning() << redact(*errMsg); return CleanupResult_Error; } @@ -203,7 +203,7 @@ public: // Output static BSONField<BSONObj> stoppedAtKeyField; - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, string const& db, BSONObj& cmdObj, int, @@ -227,9 +227,9 @@ public: const auto secondaryThrottle = uassertStatusOK(MigrationSecondaryThrottleOptions::createFromCommand(cmdObj)); const auto writeConcern = uassertStatusOK( - ChunkMoveWriteConcernOptions::getEffectiveWriteConcern(txn, secondaryThrottle)); + ChunkMoveWriteConcernOptions::getEffectiveWriteConcern(opCtx, secondaryThrottle)); - ShardingState* const shardingState = ShardingState::get(txn); + ShardingState* const shardingState = ShardingState::get(opCtx); if (!shardingState->enabled()) { errmsg = str::stream() << "server is not part of a sharded cluster or " @@ -238,7 +238,7 @@ public: } ChunkVersion shardVersion; - Status status = shardingState->refreshMetadataNow(txn, nss, &shardVersion); + Status status = shardingState->refreshMetadataNow(opCtx, nss, &shardVersion); if (!status.isOK()) { if (status.code() == ErrorCodes::RemoteChangeDetected) { warning() << "Shard version in transition detected while refreshing " @@ -251,7 +251,7 @@ public: BSONObj stoppedAtKey; CleanupResult cleanupResult = - cleanupOrphanedData(txn, nss, startingFromKey, writeConcern, &stoppedAtKey, &errmsg); + cleanupOrphanedData(opCtx, nss, startingFromKey, writeConcern, &stoppedAtKey, &errmsg); if (cleanupResult == CleanupResult_Error) { return false; diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index 778a28285d9..cdb469610bc 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -76,9 +76,9 @@ protected: auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); - auto status = MetadataLoader::makeCollectionMetadata(txn.get(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), "test.foo", "shard0000", @@ -305,9 +305,9 @@ protected: auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); - auto status = MetadataLoader::makeCollectionMetadata(txn.get(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), "test.foo", "shard0000", @@ -430,9 +430,9 @@ protected: auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); - auto status = MetadataLoader::makeCollectionMetadata(txn.get(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), "test.foo", "shard0000", @@ -509,9 +509,9 @@ protected: auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); - auto status = MetadataLoader::makeCollectionMetadata(txn.get(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), "test.foo", "shard0000", @@ -633,9 +633,9 @@ protected: auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); - auto status = MetadataLoader::makeCollectionMetadata(txn.get(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), "test.foo", "shard0000", diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp index 30855a9b210..2349eb4a3bd 100644 --- a/src/mongo/db/s/collection_range_deleter.cpp +++ b/src/mongo/db/s/collection_range_deleter.cpp @@ -75,29 +75,29 @@ CollectionRangeDeleter::CollectionRangeDeleter(NamespaceString nss) : _nss(std:: void CollectionRangeDeleter::run() { Client::initThread(getThreadName().c_str()); ON_BLOCK_EXIT([&] { Client::destroy(); }); - auto txn = cc().makeOperationContext().get(); + auto opCtx = cc().makeOperationContext().get(); const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1); - bool hasNextRangeToClean = cleanupNextRange(txn, maxToDelete); + bool hasNextRangeToClean = cleanupNextRange(opCtx, maxToDelete); // If there are more ranges to run, we add <this> back onto the task executor to run again. if (hasNextRangeToClean) { - auto executor = ShardingState::get(txn)->getRangeDeleterTaskExecutor(); + auto executor = ShardingState::get(opCtx)->getRangeDeleterTaskExecutor(); executor->scheduleWork([this](const CallbackArgs& cbArgs) { run(); }); } else { delete this; } } -bool CollectionRangeDeleter::cleanupNextRange(OperationContext* txn, int maxToDelete) { +bool CollectionRangeDeleter::cleanupNextRange(OperationContext* opCtx, int maxToDelete) { { - AutoGetCollection autoColl(txn, _nss, MODE_IX); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX); auto* collection = autoColl.getCollection(); if (!collection) { return false; } - auto* collectionShardingState = CollectionShardingState::get(txn, _nss); + auto* collectionShardingState = CollectionShardingState::get(opCtx, _nss); dassert(collectionShardingState != nullptr); // every collection gets one auto& metadataManager = collectionShardingState->_metadataManager; @@ -117,7 +117,7 @@ bool CollectionRangeDeleter::cleanupNextRange(OperationContext* txn, int maxToDe auto scopedCollectionMetadata = collectionShardingState->getMetadata(); int numDocumentsDeleted = - _doDeletion(txn, collection, scopedCollectionMetadata->getKeyPattern(), maxToDelete); + _doDeletion(opCtx, collection, scopedCollectionMetadata->getKeyPattern(), maxToDelete); if (numDocumentsDeleted <= 0) { metadataManager.removeRangeToClean(_rangeInProgress.get()); _rangeInProgress = boost::none; @@ -127,8 +127,9 @@ bool CollectionRangeDeleter::cleanupNextRange(OperationContext* txn, int maxToDe // wait for replication WriteConcernResult wcResult; - auto currentClientOpTime = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); - Status status = waitForWriteConcern(txn, currentClientOpTime, kMajorityWriteConcern, &wcResult); + auto currentClientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + Status status = + waitForWriteConcern(opCtx, currentClientOpTime, kMajorityWriteConcern, &wcResult); if (!status.isOK()) { warning() << "Error when waiting for write concern after removing chunks in " << _nss << " : " << status.reason(); @@ -137,7 +138,7 @@ bool CollectionRangeDeleter::cleanupNextRange(OperationContext* txn, int maxToDe return true; } -int CollectionRangeDeleter::_doDeletion(OperationContext* txn, +int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx, Collection* collection, const BSONObj& keyPattern, int maxToDelete) { @@ -147,7 +148,7 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* txn, // The IndexChunk has a keyPattern that may apply to more than one index - we need to // select the index and get the full index keyPattern here. const IndexDescriptor* idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, keyPattern, false); + collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPattern, false); if (idx == NULL) { warning() << "Unable to find shard key index for " << keyPattern.toString() << " in " << _nss; @@ -165,7 +166,7 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* txn, LOG(1) << "begin removal of " << min << " to " << max << " in " << _nss; auto indexName = idx->indexName(); - IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByName(txn, indexName); + IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByName(opCtx, indexName); if (!desc) { warning() << "shard key index with name " << indexName << " on '" << _nss << "' was dropped"; @@ -174,7 +175,7 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* txn, int numDeleted = 0; do { - auto exec = InternalPlanner::indexScan(txn, + auto exec = InternalPlanner::indexScan(opCtx, collection, desc, min, @@ -198,14 +199,14 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* txn, } invariant(PlanExecutor::ADVANCED == state); - WriteUnitOfWork wuow(txn); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, _nss)) { + WriteUnitOfWork wuow(opCtx); + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { warning() << "stepped down from primary while deleting chunk; orphaning data in " << _nss << " in range [" << min << ", " << max << ")"; break; } OpDebug* const nullOpDebug = nullptr; - collection->deleteDocument(txn, rloc, nullOpDebug, true); + collection->deleteDocument(opCtx, rloc, nullOpDebug, true); wuow.commit(); } while (++numDeleted < maxToDelete); return numDeleted; diff --git a/src/mongo/db/s/collection_range_deleter.h b/src/mongo/db/s/collection_range_deleter.h index 4cb52d1ee3f..f611215a73d 100644 --- a/src/mongo/db/s/collection_range_deleter.h +++ b/src/mongo/db/s/collection_range_deleter.h @@ -56,7 +56,7 @@ public: * Returns true if there are more entries in rangesToClean, false if there is no more progress * to be made. */ - bool cleanupNextRange(OperationContext* txn, int maxToDelete); + bool cleanupNextRange(OperationContext* opCtx, int maxToDelete); private: /** @@ -65,7 +65,7 @@ private: * * Returns the number of documents deleted (0 if deletion is finished), or -1 for error. */ - int _doDeletion(OperationContext* txn, + int _doDeletion(OperationContext* opCtx, Collection* collection, const BSONObj& keyPattern, int maxToDelete); diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index e35c94e6352..24746d7880e 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -66,18 +66,18 @@ using std::string; */ class ShardIdentityLogOpHandler final : public RecoveryUnit::Change { public: - ShardIdentityLogOpHandler(OperationContext* txn, ShardIdentityType shardIdentity) - : _txn(txn), _shardIdentity(std::move(shardIdentity)) {} + ShardIdentityLogOpHandler(OperationContext* opCtx, ShardIdentityType shardIdentity) + : _opCtx(opCtx), _shardIdentity(std::move(shardIdentity)) {} void commit() override { - fassertNoTrace(40071, - ShardingState::get(_txn)->initializeFromShardIdentity(_txn, _shardIdentity)); + fassertNoTrace( + 40071, ShardingState::get(_opCtx)->initializeFromShardIdentity(_opCtx, _shardIdentity)); } void rollback() override {} private: - OperationContext* _txn; + OperationContext* _opCtx; const ShardIdentityType _shardIdentity; }; @@ -90,27 +90,27 @@ CollectionShardingState::~CollectionShardingState() { invariant(!_sourceMgr); } -CollectionShardingState* CollectionShardingState::get(OperationContext* txn, +CollectionShardingState* CollectionShardingState::get(OperationContext* opCtx, const NamespaceString& nss) { - return CollectionShardingState::get(txn, nss.ns()); + return CollectionShardingState::get(opCtx, nss.ns()); } -CollectionShardingState* CollectionShardingState::get(OperationContext* txn, +CollectionShardingState* CollectionShardingState::get(OperationContext* opCtx, const std::string& ns) { // Collection lock must be held to have a reference to the collection's sharding state - dassert(txn->lockState()->isCollectionLockedForMode(ns, MODE_IS)); + dassert(opCtx->lockState()->isCollectionLockedForMode(ns, MODE_IS)); - ShardingState* const shardingState = ShardingState::get(txn); - return shardingState->getNS(ns, txn); + ShardingState* const shardingState = ShardingState::get(opCtx); + return shardingState->getNS(ns, opCtx); } ScopedCollectionMetadata CollectionShardingState::getMetadata() { return _metadataManager.getActiveMetadata(); } -void CollectionShardingState::refreshMetadata(OperationContext* txn, +void CollectionShardingState::refreshMetadata(OperationContext* opCtx, std::unique_ptr<CollectionMetadata> newMetadata) { - invariant(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); + invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); _metadataManager.refreshActiveMetadata(std::move(newMetadata)); } @@ -131,27 +131,27 @@ MigrationSourceManager* CollectionShardingState::getMigrationSourceManager() { return _sourceMgr; } -void CollectionShardingState::setMigrationSourceManager(OperationContext* txn, +void CollectionShardingState::setMigrationSourceManager(OperationContext* opCtx, MigrationSourceManager* sourceMgr) { - invariant(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); + invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); invariant(sourceMgr); invariant(!_sourceMgr); _sourceMgr = sourceMgr; } -void CollectionShardingState::clearMigrationSourceManager(OperationContext* txn) { - invariant(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); +void CollectionShardingState::clearMigrationSourceManager(OperationContext* opCtx) { + invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); invariant(_sourceMgr); _sourceMgr = nullptr; } -void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) { +void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx) { string errmsg; ChunkVersion received; ChunkVersion wanted; - if (!_checkShardVersionOk(txn, &errmsg, &received, &wanted)) { + if (!_checkShardVersionOk(opCtx, &errmsg, &received, &wanted)) { throw SendStaleConfigException( _nss.ns(), str::stream() << "[" << _nss.ns() << "] shard version not ok: " << errmsg, @@ -172,19 +172,19 @@ bool CollectionShardingState::collectionIsSharded() { return true; } -bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn, +bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* opCtx, const BSONObj& doc) { - dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (_sourceMgr) { - return _sourceMgr->getCloner()->isDocumentInMigratingChunk(txn, doc); + return _sourceMgr->getCloner()->isDocumentInMigratingChunk(opCtx, doc); } return false; } -void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& insertedDoc) { - dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); +void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) { + dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer && _nss == NamespaceString::kConfigCollectionNamespace) { @@ -192,32 +192,32 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i if (idElem.str() == ShardIdentityType::IdName) { auto shardIdentityDoc = uassertStatusOK(ShardIdentityType::fromBSON(insertedDoc)); uassertStatusOK(shardIdentityDoc.validate()); - txn->recoveryUnit()->registerChange( - new ShardIdentityLogOpHandler(txn, std::move(shardIdentityDoc))); + opCtx->recoveryUnit()->registerChange( + new ShardIdentityLogOpHandler(opCtx, std::move(shardIdentityDoc))); } } } - checkShardVersionOrThrow(txn); + checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onInsertOp(txn, insertedDoc); + _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc); } } -void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) { - dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); +void CollectionShardingState::onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) { + dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); - checkShardVersionOrThrow(txn); + checkShardVersionOrThrow(opCtx); if (_sourceMgr) { - _sourceMgr->getCloner()->onUpdateOp(txn, updatedDoc); + _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc); } } -void CollectionShardingState::onDeleteOp(OperationContext* txn, +void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const CollectionShardingState::DeleteState& deleteState) { - dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer && _nss == NamespaceString::kConfigCollectionNamespace) { @@ -225,13 +225,13 @@ void CollectionShardingState::onDeleteOp(OperationContext* txn, if (auto idElem = deleteState.idDoc["_id"]) { auto idStr = idElem.str(); if (idStr == ShardIdentityType::IdName) { - if (!repl::ReplicationCoordinator::get(txn)->getMemberState().rollback()) { + if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { uasserted(40070, "cannot delete shardIdentity document while in --shardsvr mode"); } else { warning() << "Shard identity document rolled back. Will shut down after " "finishing rollback."; - ShardIdentityRollbackNotifier::get(txn)->recordThatRollbackHappened(); + ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened(); } } } @@ -239,70 +239,74 @@ void CollectionShardingState::onDeleteOp(OperationContext* txn, if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { if (_nss == VersionType::ConfigNS) { - if (!repl::ReplicationCoordinator::get(txn)->getMemberState().rollback()) { + if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { uasserted(40302, "cannot delete config.version document while in --configsvr mode"); } else { // Throw out any cached information related to the cluster ID. - Grid::get(txn)->catalogManager()->discardCachedConfigDatabaseInitializationState(); - ClusterIdentityLoader::get(txn)->discardCachedClusterId(); + Grid::get(opCtx) + ->catalogManager() + ->discardCachedConfigDatabaseInitializationState(); + ClusterIdentityLoader::get(opCtx)->discardCachedClusterId(); } } } - checkShardVersionOrThrow(txn); + checkShardVersionOrThrow(opCtx); if (_sourceMgr && deleteState.isMigrating) { - _sourceMgr->getCloner()->onDeleteOp(txn, deleteState.idDoc); + _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.idDoc); } } -void CollectionShardingState::onDropCollection(OperationContext* txn, +void CollectionShardingState::onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName) { - dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); if (serverGlobalParams.clusterRole == ClusterRole::ShardServer && _nss == NamespaceString::kConfigCollectionNamespace) { // Dropping system collections is not allowed for end users. - invariant(!txn->writesAreReplicated()); - invariant(repl::ReplicationCoordinator::get(txn)->getMemberState().rollback()); + invariant(!opCtx->writesAreReplicated()); + invariant(repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()); // Can't confirm whether there was a ShardIdentity document or not yet, so assume there was // one and shut down the process to clear the in-memory sharding state. warning() << "admin.system.version collection rolled back. Will shut down after " "finishing rollback"; - ShardIdentityRollbackNotifier::get(txn)->recordThatRollbackHappened(); + ShardIdentityRollbackNotifier::get(opCtx)->recordThatRollbackHappened(); } if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { if (_nss == VersionType::ConfigNS) { - if (!repl::ReplicationCoordinator::get(txn)->getMemberState().rollback()) { + if (!repl::ReplicationCoordinator::get(opCtx)->getMemberState().rollback()) { uasserted(40303, "cannot drop config.version document while in --configsvr mode"); } else { // Throw out any cached information related to the cluster ID. - Grid::get(txn)->catalogManager()->discardCachedConfigDatabaseInitializationState(); - ClusterIdentityLoader::get(txn)->discardCachedClusterId(); + Grid::get(opCtx) + ->catalogManager() + ->discardCachedConfigDatabaseInitializationState(); + ClusterIdentityLoader::get(opCtx)->discardCachedClusterId(); } } } } -bool CollectionShardingState::_checkShardVersionOk(OperationContext* txn, +bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx, string* errmsg, ChunkVersion* expectedShardVersion, ChunkVersion* actualShardVersion) { - Client* client = txn->getClient(); + Client* client = opCtx->getClient(); // Operations using the DBDirectClient are unversioned. if (client->isInDirectClient()) { return true; } - if (!repl::ReplicationCoordinator::get(txn)->canAcceptWritesForDatabase(txn, _nss.db())) { + if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase(opCtx, _nss.db())) { // Right now connections to secondaries aren't versioned at all. return true; } - const auto& oss = OperationShardingState::get(txn); + const auto& oss = OperationShardingState::get(opCtx); // If there is a version attached to the OperationContext, use it as the received version. // Otherwise, get the received version from the ShardedConnectionInfo. @@ -311,8 +315,9 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* txn, } else { ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false); if (!info) { - // There is no shard version information on either 'txn' or 'client'. This means that - // the operation represented by 'txn' is unversioned, and the shard version is always OK + // There is no shard version information on either 'opCtx' or 'client'. This means that + // the operation represented by 'opCtx' is unversioned, and the shard version is always + // OK // for unversioned operations. return true; } @@ -333,7 +338,7 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* txn, // Set migration critical section on operation sharding state: operation will wait for the // migration to finish before returning failure and retrying. - OperationShardingState::get(txn).setMigrationCriticalSectionSignal( + OperationShardingState::get(opCtx).setMigrationCriticalSectionSignal( _sourceMgr->getMigrationCriticalSectionSignal()); return false; } diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 65e7bf6a882..5bbc2b9c576 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -81,8 +81,8 @@ public: * Must be called with some lock held on the specific collection being looked up and the * returned pointer should never be stored. */ - static CollectionShardingState* get(OperationContext* txn, const NamespaceString& nss); - static CollectionShardingState* get(OperationContext* txn, const std::string& ns); + static CollectionShardingState* get(OperationContext* opCtx, const NamespaceString& nss); + static CollectionShardingState* get(OperationContext* opCtx, const std::string& ns); /** * Returns the chunk metadata for the collection. @@ -96,7 +96,7 @@ public: * * Must always be called with an exclusive collection lock. */ - void refreshMetadata(OperationContext* txn, std::unique_ptr<CollectionMetadata> newMetadata); + void refreshMetadata(OperationContext* opCtx, std::unique_ptr<CollectionMetadata> newMetadata); /** * Marks the collection as not sharded at stepdown time so that no filtering will occur for @@ -128,14 +128,14 @@ public: * collection X lock. May not be called if there is a migration source manager already * installed. Must be followed by a call to clearMigrationSourceManager. */ - void setMigrationSourceManager(OperationContext* txn, MigrationSourceManager* sourceMgr); + void setMigrationSourceManager(OperationContext* opCtx, MigrationSourceManager* sourceMgr); /** * Removes a migration source manager from this collection's sharding state. Must be called with * collection X lock. May not be called if there isn't a migration source manager installed * already through a previous call to setMigrationSourceManager. */ - void clearMigrationSourceManager(OperationContext* txn); + void clearMigrationSourceManager(OperationContext* opCtx); /** * Checks whether the shard version in the context is compatible with the shard version of the @@ -146,7 +146,7 @@ public: * response is constructed, this function should be the only means of checking for shard version * match. */ - void checkShardVersionOrThrow(OperationContext* txn); + void checkShardVersionOrThrow(OperationContext* opCtx); /** * Returns whether this collection is sharded. Valid only if mongoD is primary. @@ -157,15 +157,15 @@ public: // Replication subsystem hooks. If this collection is serving as a source for migration, these // methods inform it of any changes to its contents. - bool isDocumentInMigratingChunk(OperationContext* txn, const BSONObj& doc); + bool isDocumentInMigratingChunk(OperationContext* opCtx, const BSONObj& doc); - void onInsertOp(OperationContext* txn, const BSONObj& insertedDoc); + void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc); - void onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc); + void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc); - void onDeleteOp(OperationContext* txn, const DeleteState& deleteState); + void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState); - void onDropCollection(OperationContext* txn, const NamespaceString& collectionName); + void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName); MetadataManager* getMetadataManagerForTest() { return &_metadataManager; @@ -176,7 +176,7 @@ private: /** * Checks whether the shard version of the operation matches that of the collection. * - * txn - Operation context from which to retrieve the operation's expected version. + * opCtx - Operation context from which to retrieve the operation's expected version. * errmsg (out) - On false return contains an explanatory error message. * expectedShardVersion (out) - On false return contains the expected collection version on this * shard. Obtained from the operation sharding state. @@ -186,7 +186,7 @@ private: * Returns true if the expected collection version on the shard matches its actual version on * the shard and false otherwise. Upon false return, the output parameters will be set. */ - bool _checkShardVersionOk(OperationContext* txn, + bool _checkShardVersionOk(OperationContext* opCtx, std::string* errmsg, ChunkVersion* expectedShardVersion, ChunkVersion* actualShardVersion); diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index 8a7ca715141..51666a7800f 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -61,7 +61,7 @@ public: // Note: this assumes that globalInit will always be called on the same thread as the main // test thread. - ShardingState::get(txn())->setGlobalInitMethodForTest( + ShardingState::get(opCtx())->setGlobalInitMethodForTest( [this](OperationContext*, const ConnectionString&, StringData) { _initCallCount++; return Status::OK(); @@ -70,7 +70,7 @@ public: void tearDown() override {} - OperationContext* txn() { + OperationContext* opCtx() { return _opCtx.get(); } @@ -102,8 +102,8 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) { shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - WriteUnitOfWork wuow(txn()); - collShardingState.onInsertOp(txn(), shardIdentity.toBSON()); + WriteUnitOfWork wuow(opCtx()); + collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); @@ -123,8 +123,8 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) { shardIdentity.setClusterId(OID::gen()); { - WriteUnitOfWork wuow(txn()); - collShardingState.onInsertOp(txn(), shardIdentity.toBSON()); + WriteUnitOfWork wuow(opCtx()); + collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); } @@ -141,8 +141,8 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit shardIdentity.setShardName("a"); shardIdentity.setClusterId(OID::gen()); - WriteUnitOfWork wuow(txn()); - collShardingState.onInsertOp(txn(), shardIdentity.toBSON()); + WriteUnitOfWork wuow(opCtx()); + collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON()); ASSERT_EQ(0, getInitCallCount()); @@ -158,15 +158,16 @@ TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument ShardIdentityType shardIdentity; shardIdentity.setShardName("a"); - ASSERT_THROWS(collShardingState.onInsertOp(txn(), shardIdentity.toBSON()), AssertionException); + ASSERT_THROWS(collShardingState.onInsertOp(opCtx(), shardIdentity.toBSON()), + AssertionException); } TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNotInserted) { CollectionShardingState collShardingState(getServiceContext(), NamespaceString::kConfigCollectionNamespace); - WriteUnitOfWork wuow(txn()); - collShardingState.onInsertOp(txn(), BSON("_id" << 1)); + WriteUnitOfWork wuow(opCtx()); + collShardingState.onInsertOp(opCtx(), BSON("_id" << 1)); ASSERT_EQ(0, getInitCallCount()); diff --git a/src/mongo/db/s/config/configsvr_add_shard_command.cpp b/src/mongo/db/s/config/configsvr_add_shard_command.cpp index 9f9b349b4df..5cfc614a816 100644 --- a/src/mongo/db/s/config/configsvr_add_shard_command.cpp +++ b/src/mongo/db/s/config/configsvr_add_shard_command.cpp @@ -86,7 +86,7 @@ public: return Status::OK(); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& unusedDbName, BSONObj& cmdObj, int options, @@ -105,7 +105,7 @@ public: } auto parsedRequest = std::move(swParsedRequest.getValue()); - auto replCoord = repl::ReplicationCoordinator::get(txn); + auto replCoord = repl::ReplicationCoordinator::get(opCtx); auto rsConfig = replCoord->getConfig(); auto validationStatus = parsedRequest.validate(rsConfig.isLocalHostAllowed()); @@ -119,8 +119,8 @@ public: parsedRequest.hasMaxSize() ? parsedRequest.getMaxSize() : kMaxSizeMBDefault); - StatusWith<string> addShardResult = Grid::get(txn)->catalogManager()->addShard( - txn, + StatusWith<string> addShardResult = Grid::get(opCtx)->catalogManager()->addShard( + opCtx, parsedRequest.hasName() ? &parsedRequest.getName() : nullptr, parsedRequest.getConnString(), parsedRequest.hasMaxSize() ? parsedRequest.getMaxSize() : kMaxSizeMBDefault); diff --git a/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp b/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp index 1b0a3db4148..236b2409af1 100644 --- a/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp +++ b/src/mongo/db/s/config/configsvr_add_shard_to_zone_command.cpp @@ -87,7 +87,7 @@ public: return Status::OK(); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& unusedDbName, BSONObj& cmdObj, int options, @@ -100,8 +100,8 @@ public: auto parsedRequest = uassertStatusOK(AddShardToZoneRequest::parseFromConfigCommand(cmdObj)); - uassertStatusOK(Grid::get(txn)->catalogManager()->addShardToZone( - txn, parsedRequest.getShardName(), parsedRequest.getZoneName())); + uassertStatusOK(Grid::get(opCtx)->catalogManager()->addShardToZone( + opCtx, parsedRequest.getShardName(), parsedRequest.getZoneName())); return true; } diff --git a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp index 6e0f96328ee..5144be21703 100644 --- a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp +++ b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp @@ -117,7 +117,7 @@ public: return parseNsFullyQualified(dbname, cmdObj); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& dbName, BSONObj& cmdObj, int options, @@ -129,8 +129,8 @@ public: auto commitRequest = uassertStatusOK(CommitChunkMigrationRequest::createFromCommand(nss, cmdObj)); - StatusWith<BSONObj> response = Grid::get(txn)->catalogManager()->commitChunkMigration( - txn, + StatusWith<BSONObj> response = Grid::get(opCtx)->catalogManager()->commitChunkMigration( + opCtx, nss, commitRequest.getMigratedChunk(), commitRequest.getControlChunk(), diff --git a/src/mongo/db/s/config/configsvr_control_balancer_command.cpp b/src/mongo/db/s/config/configsvr_control_balancer_command.cpp index 88deff30a9e..f4905406f78 100644 --- a/src/mongo/db/s/config/configsvr_control_balancer_command.cpp +++ b/src/mongo/db/s/config/configsvr_control_balancer_command.cpp @@ -72,7 +72,7 @@ public: return Status::OK(); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& unusedDbName, BSONObj& cmdObj, int options, @@ -87,13 +87,13 @@ public: str::stream() << getName() << " can only be run on config servers", serverGlobalParams.clusterRole == ClusterRole::ConfigServer); - _run(txn, &result); + _run(opCtx, &result); return true; } private: - virtual void _run(OperationContext* txn, BSONObjBuilder* result) = 0; + virtual void _run(OperationContext* opCtx, BSONObjBuilder* result) = 0; }; class ConfigSvrBalancerStartCommand : public ConfigSvrBalancerControlCommand { @@ -101,9 +101,9 @@ public: ConfigSvrBalancerStartCommand() : ConfigSvrBalancerControlCommand("_configsvrBalancerStart") {} private: - void _run(OperationContext* txn, BSONObjBuilder* result) override { - uassertStatusOK(Grid::get(txn)->getBalancerConfiguration()->setBalancerMode( - txn, BalancerSettingsType::kFull)); + void _run(OperationContext* opCtx, BSONObjBuilder* result) override { + uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->setBalancerMode( + opCtx, BalancerSettingsType::kFull)); } }; @@ -112,10 +112,10 @@ public: ConfigSvrBalancerStopCommand() : ConfigSvrBalancerControlCommand("_configsvrBalancerStop") {} private: - void _run(OperationContext* txn, BSONObjBuilder* result) override { - uassertStatusOK(Grid::get(txn)->getBalancerConfiguration()->setBalancerMode( - txn, BalancerSettingsType::kOff)); - Balancer::get(txn)->joinCurrentRound(txn); + void _run(OperationContext* opCtx, BSONObjBuilder* result) override { + uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->setBalancerMode( + opCtx, BalancerSettingsType::kOff)); + Balancer::get(opCtx)->joinCurrentRound(opCtx); } }; @@ -125,8 +125,8 @@ public: : ConfigSvrBalancerControlCommand("_configsvrBalancerStatus") {} private: - void _run(OperationContext* txn, BSONObjBuilder* result) override { - Balancer::get(txn)->report(txn, result); + void _run(OperationContext* opCtx, BSONObjBuilder* result) override { + Balancer::get(opCtx)->report(opCtx, result); } }; diff --git a/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp index 8988a5e4e4e..7d4dfc12b87 100644 --- a/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp +++ b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp @@ -98,7 +98,7 @@ public: return parseNsFullyQualified(dbname, cmdObj); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& dbName, BSONObj& cmdObj, int options, @@ -112,11 +112,11 @@ public: auto parsedRequest = uassertStatusOK(MergeChunkRequest::parseFromConfigCommand(cmdObj)); Status mergeChunkResult = - Grid::get(txn)->catalogManager()->commitChunkMerge(txn, - parsedRequest.getNamespace(), - parsedRequest.getEpoch(), - parsedRequest.getChunkBoundaries(), - parsedRequest.getShardName()); + Grid::get(opCtx)->catalogManager()->commitChunkMerge(opCtx, + parsedRequest.getNamespace(), + parsedRequest.getEpoch(), + parsedRequest.getChunkBoundaries(), + parsedRequest.getShardName()); if (!mergeChunkResult.isOK()) { return appendCommandStatus(result, mergeChunkResult); diff --git a/src/mongo/db/s/config/configsvr_move_chunk_command.cpp b/src/mongo/db/s/config/configsvr_move_chunk_command.cpp index 0e64207a217..8b4fe32025e 100644 --- a/src/mongo/db/s/config/configsvr_move_chunk_command.cpp +++ b/src/mongo/db/s/config/configsvr_move_chunk_command.cpp @@ -78,7 +78,7 @@ public: return Status::OK(); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& unusedDbName, BSONObj& cmdObj, int options, @@ -87,14 +87,14 @@ public: auto request = uassertStatusOK(BalanceChunkRequest::parseFromConfigCommand(cmdObj)); if (request.hasToShardId()) { - uassertStatusOK(Balancer::get(txn)->moveSingleChunk(txn, - request.getChunk(), - request.getToShardId(), - request.getMaxChunkSizeBytes(), - request.getSecondaryThrottle(), - request.getWaitForDelete())); + uassertStatusOK(Balancer::get(opCtx)->moveSingleChunk(opCtx, + request.getChunk(), + request.getToShardId(), + request.getMaxChunkSizeBytes(), + request.getSecondaryThrottle(), + request.getWaitForDelete())); } else { - uassertStatusOK(Balancer::get(txn)->rebalanceSingleChunk(txn, request.getChunk())); + uassertStatusOK(Balancer::get(opCtx)->rebalanceSingleChunk(opCtx, request.getChunk())); } return true; diff --git a/src/mongo/db/s/config/configsvr_remove_shard_from_zone_command.cpp b/src/mongo/db/s/config/configsvr_remove_shard_from_zone_command.cpp index 376bf5cfd5f..ae83006e471 100644 --- a/src/mongo/db/s/config/configsvr_remove_shard_from_zone_command.cpp +++ b/src/mongo/db/s/config/configsvr_remove_shard_from_zone_command.cpp @@ -87,7 +87,7 @@ public: return Status::OK(); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& unusedDbName, BSONObj& cmdObj, int options, @@ -101,8 +101,8 @@ public: auto parsedRequest = uassertStatusOK(RemoveShardFromZoneRequest::parseFromConfigCommand(cmdObj)); - uassertStatusOK(Grid::get(txn)->catalogManager()->removeShardFromZone( - txn, parsedRequest.getShardName(), parsedRequest.getZoneName())); + uassertStatusOK(Grid::get(opCtx)->catalogManager()->removeShardFromZone( + opCtx, parsedRequest.getShardName(), parsedRequest.getZoneName())); return true; } diff --git a/src/mongo/db/s/config/configsvr_set_feature_compatibility_version_command.cpp b/src/mongo/db/s/config/configsvr_set_feature_compatibility_version_command.cpp index 694cec5f96c..8e1fa7825e8 100644 --- a/src/mongo/db/s/config/configsvr_set_feature_compatibility_version_command.cpp +++ b/src/mongo/db/s/config/configsvr_set_feature_compatibility_version_command.cpp @@ -81,7 +81,7 @@ public: return Status::OK(); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& unusedDbName, BSONObj& cmdObj, int options, @@ -97,11 +97,11 @@ public: serverGlobalParams.clusterRole == ClusterRole::ConfigServer); // Forward to all shards. - uassertStatusOK( - Grid::get(txn)->catalogManager()->setFeatureCompatibilityVersionOnShards(txn, version)); + uassertStatusOK(Grid::get(opCtx)->catalogManager()->setFeatureCompatibilityVersionOnShards( + opCtx, version)); // On success, set featureCompatibilityVersion on self. - FeatureCompatibilityVersion::set(txn, version); + FeatureCompatibilityVersion::set(opCtx, version); return true; } diff --git a/src/mongo/db/s/config/configsvr_split_chunk_command.cpp b/src/mongo/db/s/config/configsvr_split_chunk_command.cpp index a8744987929..0b3cfe6f40d 100644 --- a/src/mongo/db/s/config/configsvr_split_chunk_command.cpp +++ b/src/mongo/db/s/config/configsvr_split_chunk_command.cpp @@ -96,7 +96,7 @@ public: return parseNsFullyQualified(dbname, cmdObj); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& dbName, BSONObj& cmdObj, int options, @@ -110,12 +110,12 @@ public: auto parsedRequest = uassertStatusOK(SplitChunkRequest::parseFromConfigCommand(cmdObj)); Status splitChunkResult = - Grid::get(txn)->catalogManager()->commitChunkSplit(txn, - parsedRequest.getNamespace(), - parsedRequest.getEpoch(), - parsedRequest.getChunkRange(), - parsedRequest.getSplitPoints(), - parsedRequest.getShardName()); + Grid::get(opCtx)->catalogManager()->commitChunkSplit(opCtx, + parsedRequest.getNamespace(), + parsedRequest.getEpoch(), + parsedRequest.getChunkRange(), + parsedRequest.getSplitPoints(), + parsedRequest.getShardName()); if (!splitChunkResult.isOK()) { return appendCommandStatus(result, splitChunkResult); } diff --git a/src/mongo/db/s/config/configsvr_update_zone_key_range_command.cpp b/src/mongo/db/s/config/configsvr_update_zone_key_range_command.cpp index 09ff5f8bf74..36d68576568 100644 --- a/src/mongo/db/s/config/configsvr_update_zone_key_range_command.cpp +++ b/src/mongo/db/s/config/configsvr_update_zone_key_range_command.cpp @@ -89,7 +89,7 @@ public: return Status::OK(); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& unusedDbName, BSONObj& cmdObj, int options, @@ -109,11 +109,11 @@ public: } if (parsedRequest.isRemove()) { - uassertStatusOK(Grid::get(txn)->catalogManager()->removeKeyRangeFromZone( - txn, parsedRequest.getNS(), parsedRequest.getRange())); + uassertStatusOK(Grid::get(opCtx)->catalogManager()->removeKeyRangeFromZone( + opCtx, parsedRequest.getNS(), parsedRequest.getRange())); } else { - uassertStatusOK(Grid::get(txn)->catalogManager()->assignKeyRangeToZone( - txn, parsedRequest.getNS(), parsedRequest.getRange(), zoneName)); + uassertStatusOK(Grid::get(opCtx)->catalogManager()->assignKeyRangeToZone( + opCtx, parsedRequest.getNS(), parsedRequest.getRange(), zoneName)); } return true; diff --git a/src/mongo/db/s/get_shard_version_command.cpp b/src/mongo/db/s/get_shard_version_command.cpp index a14732867d7..86796a4ef50 100644 --- a/src/mongo/db/s/get_shard_version_command.cpp +++ b/src/mongo/db/s/get_shard_version_command.cpp @@ -82,7 +82,7 @@ public: return parseNsFullyQualified(dbname, cmdObj); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& dbname, BSONObj& cmdObj, int options, @@ -93,14 +93,14 @@ public: str::stream() << nss.ns() << " is not a valid namespace", nss.isValid()); - ShardingState* const gss = ShardingState::get(txn); + ShardingState* const gss = ShardingState::get(opCtx); if (gss->enabled()) { - result.append("configServer", gss->getConfigServer(txn).toString()); + result.append("configServer", gss->getConfigServer(opCtx).toString()); } else { result.append("configServer", ""); } - ShardedConnectionInfo* const sci = ShardedConnectionInfo::get(txn->getClient(), false); + ShardedConnectionInfo* const sci = ShardedConnectionInfo::get(opCtx->getClient(), false); result.appendBool("inShardedMode", sci != nullptr); if (sci) { result.appendTimestamp("mine", sci->getVersion(nss.ns()).toLong()); @@ -108,8 +108,8 @@ public: result.appendTimestamp("mine", 0); } - AutoGetCollection autoColl(txn, nss, MODE_IS); - CollectionShardingState* const css = CollectionShardingState::get(txn, nss); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + CollectionShardingState* const css = CollectionShardingState::get(opCtx, nss); ScopedCollectionMetadata metadata; if (css) { diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index 242c9d5fc7f..f03e7948374 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -57,16 +57,16 @@ using std::vector; namespace { -bool _checkMetadataForSuccess(OperationContext* txn, +bool _checkMetadataForSuccess(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey, const BSONObj& maxKey) { ScopedCollectionMetadata metadataAfterMerge; { - AutoGetCollection autoColl(txn, nss, MODE_IS); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); // Get collection metadata - metadataAfterMerge = CollectionShardingState::get(txn, nss.ns())->getMetadata(); + metadataAfterMerge = CollectionShardingState::get(opCtx, nss.ns())->getMetadata(); } ChunkType chunk; @@ -77,7 +77,7 @@ bool _checkMetadataForSuccess(OperationContext* txn, return chunk.getMin().woCompare(minKey) == 0 && chunk.getMax().woCompare(maxKey) == 0; } -Status mergeChunks(OperationContext* txn, +Status mergeChunks(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey, const BSONObj& maxKey, @@ -86,8 +86,8 @@ Status mergeChunks(OperationContext* txn, // TODO(SERVER-25086): Remove distLock acquisition from merge chunk const string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey << " to " << maxKey; - auto scopedDistLock = grid.catalogClient(txn)->getDistLockManager()->lock( - txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); + auto scopedDistLock = grid.catalogClient(opCtx)->getDistLockManager()->lock( + opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); if (!scopedDistLock.isOK()) { std::string errmsg = stream() << "could not acquire collection lock for " << nss.ns() @@ -99,14 +99,14 @@ Status mergeChunks(OperationContext* txn, return Status(scopedDistLock.getStatus().code(), errmsg); } - ShardingState* shardingState = ShardingState::get(txn); + ShardingState* shardingState = ShardingState::get(opCtx); // // We now have the collection lock, refresh metadata to latest version and sanity check // ChunkVersion shardVersion; - Status refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersion); + Status refreshStatus = shardingState->refreshMetadataNow(opCtx, nss, &shardVersion); if (!refreshStatus.isOK()) { std::string errmsg = str::stream() @@ -130,9 +130,9 @@ Status mergeChunks(OperationContext* txn, ScopedCollectionMetadata metadata; { - AutoGetCollection autoColl(txn, nss, MODE_IS); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); - metadata = CollectionShardingState::get(txn, nss.ns())->getMetadata(); + metadata = CollectionShardingState::get(opCtx, nss.ns())->getMetadata(); if (!metadata || metadata->getKeyPattern().isEmpty()) { std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns() << " is not sharded"; @@ -262,8 +262,8 @@ Status mergeChunks(OperationContext* txn, auto configCmdObj = request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - auto cmdResponseStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand( - txn, + auto cmdResponseStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand( + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", configCmdObj, @@ -275,7 +275,7 @@ Status mergeChunks(OperationContext* txn, // { ChunkVersion shardVersionAfterMerge; - refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersionAfterMerge); + refreshStatus = shardingState->refreshMetadataNow(opCtx, nss, &shardVersionAfterMerge); if (!refreshStatus.isOK()) { std::string errmsg = str::stream() << "failed to refresh metadata for merge chunk [" @@ -301,7 +301,7 @@ Status mergeChunks(OperationContext* txn, auto writeConcernStatus = std::move(cmdResponseStatus.getValue().writeConcernStatus); if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) && - _checkMetadataForSuccess(txn, nss, minKey, maxKey)) { + _checkMetadataForSuccess(opCtx, nss, minKey, maxKey)) { LOG(1) << "mergeChunk [" << redact(minKey) << "," << redact(maxKey) << ") has already been committed."; @@ -360,13 +360,13 @@ public: // Optional, if the merge is only valid for a particular epoch static BSONField<OID> epochField; - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) override { - uassertStatusOK(ShardingState::get(txn)->canAcceptShardedCommands()); + uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); string ns = parseNs(dbname, cmdObj); @@ -409,7 +409,7 @@ public: return false; } - auto mergeStatus = mergeChunks(txn, NamespaceString(ns), minKey, maxKey, epoch); + auto mergeStatus = mergeChunks(opCtx, NamespaceString(ns), minKey, maxKey, epoch); return appendCommandStatus(result, mergeStatus); } } mergeChunksCmd; diff --git a/src/mongo/db/s/metadata_loader.cpp b/src/mongo/db/s/metadata_loader.cpp index f337e56224b..8385ea5c3d6 100644 --- a/src/mongo/db/s/metadata_loader.cpp +++ b/src/mongo/db/s/metadata_loader.cpp @@ -77,12 +77,12 @@ public: return chunk.getShard() == _currShard; } - virtual pair<BSONObj, CachedChunkInfo> rangeFor(OperationContext* txn, + virtual pair<BSONObj, CachedChunkInfo> rangeFor(OperationContext* opCtx, const ChunkType& chunk) const { return make_pair(chunk.getMin(), CachedChunkInfo(chunk.getMax(), chunk.getVersion())); } - virtual ShardId shardFor(OperationContext* txn, const ShardId& name) const { + virtual ShardId shardFor(OperationContext* opCtx, const ShardId& name) const { return name; } @@ -96,27 +96,27 @@ private: } // namespace -Status MetadataLoader::makeCollectionMetadata(OperationContext* txn, +Status MetadataLoader::makeCollectionMetadata(OperationContext* opCtx, ShardingCatalogClient* catalogClient, const string& ns, const string& shard, const CollectionMetadata* oldMetadata, CollectionMetadata* metadata) { - Status initCollectionStatus = _initCollection(txn, catalogClient, ns, shard, metadata); + Status initCollectionStatus = _initCollection(opCtx, catalogClient, ns, shard, metadata); if (!initCollectionStatus.isOK()) { return initCollectionStatus; } - return _initChunks(txn, catalogClient, ns, shard, oldMetadata, metadata); + return _initChunks(opCtx, catalogClient, ns, shard, oldMetadata, metadata); } -Status MetadataLoader::_initCollection(OperationContext* txn, +Status MetadataLoader::_initCollection(OperationContext* opCtx, ShardingCatalogClient* catalogClient, const string& ns, const string& shard, CollectionMetadata* metadata) { // Get the config.collections entry for 'ns'. - auto coll = catalogClient->getCollection(txn, ns); + auto coll = catalogClient->getCollection(opCtx, ns); if (!coll.isOK()) { return coll.getStatus(); } @@ -138,7 +138,7 @@ Status MetadataLoader::_initCollection(OperationContext* txn, return Status::OK(); } -Status MetadataLoader::_initChunks(OperationContext* txn, +Status MetadataLoader::_initChunks(OperationContext* opCtx, ShardingCatalogClient* catalogClient, const string& ns, const string& shard, @@ -186,7 +186,7 @@ Status MetadataLoader::_initChunks(OperationContext* txn, const auto diffQuery = SCMConfigDiffTracker::createConfigDiffQuery(NamespaceString(ns), metadata->_collVersion); std::vector<ChunkType> chunks; - Status status = catalogClient->getChunks(txn, + Status status = catalogClient->getChunks(opCtx, diffQuery.query, diffQuery.sort, boost::none, @@ -200,7 +200,7 @@ Status MetadataLoader::_initChunks(OperationContext* txn, // If we are the primary, or a standalone, persist new chunks locally. status = _writeNewChunksIfPrimary( - txn, NamespaceString(ns), chunks, metadata->_collVersion.epoch()); + opCtx, NamespaceString(ns), chunks, metadata->_collVersion.epoch()); if (!status.isOK()) { return status; } @@ -210,7 +210,7 @@ Status MetadataLoader::_initChunks(OperationContext* txn, // last time). If not, something has changed on the config server (potentially between // when we read the collection data and when we read the chunks data). // - int diffsApplied = differ.calculateConfigDiff(txn, chunks); + int diffsApplied = differ.calculateConfigDiff(opCtx, chunks); if (diffsApplied > 0) { // Chunks found, return ok LOG(2) << "loaded " << diffsApplied << " chunks into new metadata for " << ns @@ -253,7 +253,7 @@ Status MetadataLoader::_initChunks(OperationContext* txn, } } -Status MetadataLoader::_writeNewChunksIfPrimary(OperationContext* txn, +Status MetadataLoader::_writeNewChunksIfPrimary(OperationContext* opCtx, const NamespaceString& nss, const std::vector<ChunkType>& chunks, const OID& currEpoch) { @@ -261,13 +261,13 @@ Status MetadataLoader::_writeNewChunksIfPrimary(OperationContext* txn, // Only do the write(s) if this is a primary or standalone. Otherwise, return OK. if (serverGlobalParams.clusterRole != ClusterRole::ShardServer || - !repl::ReplicationCoordinator::get(txn)->canAcceptWritesForDatabase_UNSAFE( - txn, chunkMetadataNss.ns())) { + !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase_UNSAFE( + opCtx, chunkMetadataNss.ns())) { return Status::OK(); } try { - DBDirectClient client(txn); + DBDirectClient client(opCtx); /** * Here are examples of the operations that can happen on the config server to update diff --git a/src/mongo/db/s/metadata_loader.h b/src/mongo/db/s/metadata_loader.h index 8c12233da2e..cfeb6348cfb 100644 --- a/src/mongo/db/s/metadata_loader.h +++ b/src/mongo/db/s/metadata_loader.h @@ -56,7 +56,7 @@ class OperationContext; * Example usage: * beforeMetadata = <get latest local metadata>; * remoteMetadata = makeCollectionMetadata( beforeMetadata, remoteMetadata ); - * DBLock lock(txn, dbname, MODE_X); + * DBLock lock(opCtx, dbname, MODE_X); * afterMetadata = <get latest local metadata>; * * The loader will go out of its way to try to fetch the smaller amount possible of data @@ -85,7 +85,7 @@ public: * @return HostUnreachable if there was an error contacting the config servers * @return RemoteChangeDetected if the data loaded was modified by another operation */ - static Status makeCollectionMetadata(OperationContext* txn, + static Status makeCollectionMetadata(OperationContext* opCtx, ShardingCatalogClient* catalogClient, const std::string& ns, const std::string& shard, @@ -104,7 +104,7 @@ private: * @return RemoteChangeDetected if the collection doc loaded is unexpectedly different * */ - static Status _initCollection(OperationContext* txn, + static Status _initCollection(OperationContext* opCtx, ShardingCatalogClient* catalogClient, const std::string& ns, const std::string& shard, @@ -123,7 +123,7 @@ private: * @return NamespaceNotFound if there are no chunks loaded and an epoch change is detected * TODO: @return FailedToParse */ - static Status _initChunks(OperationContext* txn, + static Status _initChunks(OperationContext* opCtx, ShardingCatalogClient* catalogClient, const std::string& ns, const std::string& shard, @@ -148,7 +148,7 @@ private: * 'currEpoch' * - Other errors in writes/reads to the config.chunks.ns collection fails. */ - static Status _writeNewChunksIfPrimary(OperationContext* txn, + static Status _writeNewChunksIfPrimary(OperationContext* opCtx, const NamespaceString& nss, const std::vector<ChunkType>& chunks, const OID& currEpoch); diff --git a/src/mongo/db/s/metadata_loader_test.cpp b/src/mongo/db/s/metadata_loader_test.cpp index 964cffe9071..b9d10773563 100644 --- a/src/mongo/db/s/metadata_loader_test.cpp +++ b/src/mongo/db/s/metadata_loader_test.cpp @@ -238,10 +238,10 @@ TEST_F(MetadataLoaderTest, NoChunksIsDropped) { auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); CollectionMetadata metadata; - auto status = MetadataLoader::makeCollectionMetadata(txn.get(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), kNss.ns(), kShardId.toString(), @@ -272,10 +272,10 @@ TEST_F(MetadataLoaderTest, CheckNumChunk) { auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); CollectionMetadata metadata; - auto status = MetadataLoader::makeCollectionMetadata(txn.get(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), kNss.ns(), kShardId.toString(), @@ -299,10 +299,10 @@ TEST_F(MetadataLoaderTest, SingleChunkCheckNumChunk) { auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); CollectionMetadata metadata; - auto status = MetadataLoader::makeCollectionMetadata(txn.get(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), kNss.ns(), kShardId.toString(), @@ -326,10 +326,10 @@ TEST_F(MetadataLoaderTest, SeveralChunksCheckNumChunks) { auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); CollectionMetadata metadata; - auto status = MetadataLoader::makeCollectionMetadata(txn.get(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), kNss.ns(), kShardId.toString(), @@ -353,10 +353,10 @@ TEST_F(MetadataLoaderTest, CollectionMetadataSetUp) { auto future = launchAsync([this] { ON_BLOCK_EXIT([&] { Client::destroy(); }); Client::initThreadIfNotAlready("Test"); - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); CollectionMetadata metadata; - auto status = MetadataLoader::makeCollectionMetadata(txn.get(), + auto status = MetadataLoader::makeCollectionMetadata(opCtx.get(), catalogClient(), kNss.ns(), kShardId.toString(), diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp index 08f26ac3298..4f55d9f6f05 100644 --- a/src/mongo/db/s/metadata_manager_test.cpp +++ b/src/mongo/db/s/metadata_manager_test.cpp @@ -251,13 +251,13 @@ TEST_F(MetadataManagerTest, NotificationBlocksUntilDeletion) { ChunkRange cr1(BSON("key" << 0), BSON("key" << 10)); auto notification = manager.addRangeToClean(cr1); - auto txn = cc().makeOperationContext().get(); + auto opCtx = cc().makeOperationContext().get(); // Once the new range deleter is set up, this might fail if the range deleter // deleted cr1 before we got here... - ASSERT_FALSE(notification->waitFor(txn, Milliseconds(0))); + ASSERT_FALSE(notification->waitFor(opCtx, Milliseconds(0))); manager.removeRangeToClean(cr1); - ASSERT_TRUE(notification->waitFor(txn, Milliseconds(0))); + ASSERT_TRUE(notification->waitFor(opCtx, Milliseconds(0))); ASSERT_OK(notification->get()); } diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h index 04cf9e36df2..50a31da4db6 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source.h +++ b/src/mongo/db/s/migration_chunk_cloner_source.h @@ -65,7 +65,7 @@ public: * NOTE: Must be called without any locks and must succeed, before any other methods are called * (except for cancelClone and [insert/update/delete]Op). */ - virtual Status startClone(OperationContext* txn) = 0; + virtual Status startClone(OperationContext* opCtx) = 0; /** * Blocking method, which uses some custom selected logic for deciding whether it is appropriate @@ -77,7 +77,7 @@ public: * * NOTE: Must be called without any locks. */ - virtual Status awaitUntilCriticalSectionIsAppropriate(OperationContext* txn, + virtual Status awaitUntilCriticalSectionIsAppropriate(OperationContext* opCtx, Milliseconds maxTimeToWait) = 0; /** @@ -90,7 +90,7 @@ public: * * NOTE: Must be called without any locks. */ - virtual Status commitClone(OperationContext* txn) = 0; + virtual Status commitClone(OperationContext* opCtx) = 0; /** * Tells the recipient to abort the clone and cleanup any unused data. This method's @@ -98,7 +98,7 @@ public: * * NOTE: Must be called without any locks. */ - virtual void cancelClone(OperationContext* txn) = 0; + virtual void cancelClone(OperationContext* opCtx) = 0; // These methods are only meaningful for the legacy cloner and they are used as a way to keep a // running list of changes, which need to be fetched. @@ -109,7 +109,7 @@ public: * * NOTE: Must be called with at least IS lock held on the collection. */ - virtual bool isDocumentInMigratingChunk(OperationContext* txn, const BSONObj& doc) = 0; + virtual bool isDocumentInMigratingChunk(OperationContext* opCtx, const BSONObj& doc) = 0; /** * Notifies this cloner that an insert happened to the collection, which it owns. It is up to @@ -118,7 +118,7 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onInsertOp(OperationContext* txn, const BSONObj& insertedDoc) = 0; + virtual void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) = 0; /** * Notifies this cloner that an update happened to the collection, which it owns. It is up to @@ -127,7 +127,7 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) = 0; + virtual void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) = 0; /** * Notifies this cloner that a delede happened to the collection, which it owns. It is up to the @@ -136,7 +136,7 @@ public: * * NOTE: Must be called with at least IX lock held on the collection. */ - virtual void onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId) = 0; + virtual void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) = 0; protected: MigrationChunkClonerSource(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index ac6b513a049..9354f60b8e1 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -90,10 +90,10 @@ BSONObj createRequestWithSessionId(StringData commandName, */ class DeleteNotificationStage final : public PlanStage { public: - DeleteNotificationStage(MigrationChunkClonerSourceLegacy* cloner, OperationContext* txn) - : PlanStage("SHARDING_NOTIFY_DELETE", txn), _cloner(cloner) {} + DeleteNotificationStage(MigrationChunkClonerSourceLegacy* cloner, OperationContext* opCtx) + : PlanStage("SHARDING_NOTIFY_DELETE", opCtx), _cloner(cloner) {} - void doInvalidate(OperationContext* txn, const RecordId& dl, InvalidationType type) override { + void doInvalidate(OperationContext* opCtx, const RecordId& dl, InvalidationType type) override { if (type == INVALIDATION_DELETION) { stdx::lock_guard<stdx::mutex> sl(_cloner->_mutex); _cloner->_cloneLocs.erase(dl); @@ -182,12 +182,12 @@ MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(!_deleteNotifyExec); } -Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) { +Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { invariant(_state == kNew); - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); // Load the ids of the currently available documents - auto storeCurrentLocsStatus = _storeCurrentLocs(txn); + auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx); if (!storeCurrentLocsStatus.isOK()) { return storeCurrentLocsStatus; } @@ -223,9 +223,9 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* txn) { } Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( - OperationContext* txn, Milliseconds maxTimeToWait) { + OperationContext* opCtx, Milliseconds maxTimeToWait) { invariant(_state == kCloning); - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); const auto startTime = Date_t::now(); @@ -297,7 +297,7 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( "Aborting migration because of high memory usage"}; } - Status interruptStatus = txn->checkForInterruptNoAssert(); + Status interruptStatus = opCtx->checkForInterruptNoAssert(); if (!interruptStatus.isOK()) { return interruptStatus; } @@ -306,23 +306,23 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( return {ErrorCodes::ExceededTimeLimit, "Timed out waiting for the cloner to catch up"}; } -Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* txn) { +Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { invariant(_state == kCloning); - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); auto responseStatus = _callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId)); if (responseStatus.isOK()) { - _cleanup(txn); + _cleanup(opCtx); return Status::OK(); } - cancelClone(txn); + cancelClone(opCtx); return responseStatus.getStatus(); } -void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); switch (_state) { case kDone: @@ -331,21 +331,21 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* txn) { _callRecipient(createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId)); // Intentional fall through case kNew: - _cleanup(txn); + _cleanup(opCtx); break; default: MONGO_UNREACHABLE; } } -bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(OperationContext* txn, +bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(OperationContext* opCtx, const BSONObj& doc) { return isInRange(doc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern); } -void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* txn, +void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) { - dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = insertedDoc["_id"]; if (idElement.eoo()) { @@ -358,12 +358,12 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* txn, return; } - txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i')); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i')); } -void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* txn, +void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) { - dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = updatedDoc["_id"]; if (idElement.eoo()) { @@ -376,12 +376,12 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* txn, return; } - txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u')); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u')); } -void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* txn, +void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) { - dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX)); BSONElement idElement = deletedDocId["_id"]; if (idElement.eoo()) { @@ -390,7 +390,7 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* txn, return; } - txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd')); + opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd')); } uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { @@ -400,12 +400,12 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { _averageObjectSizeForCloneLocs * _cloneLocs.size()); } -Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn, +Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, Collection* collection, BSONArrayBuilder* arrBuilder) { - dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS)); - ElapsedTracker tracker(txn->getServiceContext()->getFastClockSource(), + ElapsedTracker tracker(opCtx->getServiceContext()->getFastClockSource(), internalQueryExecYieldIterations.load(), Milliseconds(internalQueryExecYieldPeriodMS.load())); @@ -421,7 +421,7 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn, } Snapshotted<BSONObj> doc; - if (collection->findDoc(txn, *it, &doc)) { + if (collection->findDoc(opCtx, *it, &doc)) { // Use the builder size instead of accumulating the document sizes directly so that we // take into consideration the overhead of BSONArray indices. if (arrBuilder->arrSize() && @@ -444,10 +444,10 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* txn, return Status::OK(); } -Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* txn, +Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder) { - dassert(txn->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS)); + dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IS)); stdx::lock_guard<stdx::mutex> sl(_mutex); @@ -456,15 +456,15 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* txn, long long docSizeAccumulator = 0; - _xfer(txn, db, &_deleted, builder, "deleted", &docSizeAccumulator, false); - _xfer(txn, db, &_reload, builder, "reload", &docSizeAccumulator, true); + _xfer(opCtx, db, &_deleted, builder, "deleted", &docSizeAccumulator, false); + _xfer(opCtx, db, &_reload, builder, "reload", &docSizeAccumulator, true); builder->append("size", docSizeAccumulator); return Status::OK(); } -void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* txn) { +void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* opCtx) { { stdx::lock_guard<stdx::mutex> sl(_mutex); _state = kDone; @@ -473,8 +473,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup(OperationContext* txn) { } if (_deleteNotifyExec) { - ScopedTransaction scopedXact(txn, MODE_IS); - AutoGetCollection autoColl(txn, _args.getNss(), MODE_IS); + ScopedTransaction scopedXact(opCtx, MODE_IS); + AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); _deleteNotifyExec.reset(); } @@ -510,9 +510,9 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(const BSONO return responseStatus.data.getOwned(); } -Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn) { - ScopedTransaction scopedXact(txn, MODE_IS); - AutoGetCollection autoColl(txn, _args.getNss(), MODE_IS); +Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opCtx) { + ScopedTransaction scopedXact(opCtx, MODE_IS); + AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); Collection* const collection = autoColl.getCollection(); if (!collection) { @@ -523,7 +523,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn // Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, any // multi-key index prefixed by shard key cannot be multikey over the shard key fields. IndexDescriptor* const idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, + collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, _shardKeyPattern.toBSON(), false); // requireSingleKey if (!idx) { @@ -535,9 +535,9 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn // Install the stage, which will listen for notifications on the collection auto statusWithDeleteNotificationPlanExecutor = - PlanExecutor::make(txn, + PlanExecutor::make(opCtx, stdx::make_unique<WorkingSet>(), - stdx::make_unique<DeleteNotificationStage>(this, txn), + stdx::make_unique<DeleteNotificationStage>(this, opCtx), collection, PlanExecutor::YIELD_MANUAL); if (!statusWithDeleteNotificationPlanExecutor.isOK()) { @@ -554,7 +554,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn BSONObj max = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMaxKey(), false)); std::unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(txn, + InternalPlanner::indexScan(opCtx, collection, idx, min, @@ -572,9 +572,9 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn unsigned long long maxRecsWhenFull; long long avgRecSize; - const long long totalRecs = collection->numRecords(txn); + const long long totalRecs = collection->numRecords(opCtx); if (totalRecs > 0) { - avgRecSize = collection->dataSize(txn) / totalRecs; + avgRecSize = collection->dataSize(opCtx) / totalRecs; maxRecsWhenFull = _args.getMaxChunkSizeBytes() / avgRecSize; maxRecsWhenFull = std::min((unsigned long long)(kMaxObjectPerChunk + 1), 130 * maxRecsWhenFull / 100 /* slack */); @@ -610,7 +610,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn << WorkingSetCommon::toStatusString(obj)}; } - const uint64_t collectionAverageObjectSize = collection->averageObjectSize(txn); + const uint64_t collectionAverageObjectSize = collection->averageObjectSize(opCtx); if (isLargeChunk) { return { @@ -638,7 +638,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* txn return Status::OK(); } -void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* txn, +void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx, Database* db, std::list<BSONObj>* docIdList, BSONObjBuilder* builder, @@ -660,7 +660,7 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* txn, BSONObj idDoc = *docIdIter; if (explode) { BSONObj fullDoc; - if (Helpers::findById(txn, db, ns.c_str(), idDoc, fullDoc)) { + if (Helpers::findById(opCtx, db, ns.c_str(), idDoc, fullDoc)) { arr.append(fullDoc); *sizeAccumulator += fullDoc.objsize(); } diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index c683df2be29..7f8b7bf5468 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -61,22 +61,22 @@ public: HostAndPort recipientHost); ~MigrationChunkClonerSourceLegacy(); - Status startClone(OperationContext* txn) override; + Status startClone(OperationContext* opCtx) override; - Status awaitUntilCriticalSectionIsAppropriate(OperationContext* txn, + Status awaitUntilCriticalSectionIsAppropriate(OperationContext* opCtx, Milliseconds maxTimeToWait) override; - Status commitClone(OperationContext* txn) override; + Status commitClone(OperationContext* opCtx) override; - void cancelClone(OperationContext* txn) override; + void cancelClone(OperationContext* opCtx) override; - bool isDocumentInMigratingChunk(OperationContext* txn, const BSONObj& doc) override; + bool isDocumentInMigratingChunk(OperationContext* opCtx, const BSONObj& doc) override; - void onInsertOp(OperationContext* txn, const BSONObj& insertedDoc) override; + void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) override; - void onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) override; + void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) override; - void onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId) override; + void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) override; // Legacy cloner specific functionality @@ -108,7 +108,7 @@ public: * * NOTE: Must be called with the collection lock held in at least IS mode. */ - Status nextCloneBatch(OperationContext* txn, + Status nextCloneBatch(OperationContext* opCtx, Collection* collection, BSONArrayBuilder* arrBuilder); @@ -119,7 +119,7 @@ public: * * NOTE: Must be called with the collection lock held in at least IS mode. */ - Status nextModsBatch(OperationContext* txn, Database* db, BSONObjBuilder* builder); + Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder); private: friend class DeleteNotificationStage; @@ -132,7 +132,7 @@ private: * Idempotent method, which cleans up any previously initialized state. It is safe to be called * at any time, but no methods should be called after it. */ - void _cleanup(OperationContext* txn); + void _cleanup(OperationContext* opCtx); /** * Synchronously invokes the recipient shard with the specified command and either returns the @@ -146,7 +146,7 @@ private: * * Returns OK or any error status otherwise. */ - Status _storeCurrentLocs(OperationContext* txn); + Status _storeCurrentLocs(OperationContext* opCtx); /** * Insert items from docIdList to a new array with the given fieldName in the given builder. If @@ -156,7 +156,7 @@ private: * * Should be holding the collection lock for ns if explode is true. */ - void _xfer(OperationContext* txn, + void _xfer(OperationContext* opCtx, Database* db, std::list<BSONObj>* docIdList, BSONObjBuilder* builder, diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp index 2c2df8cd3f2..a51ef083521 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp @@ -57,21 +57,21 @@ class AutoGetActiveCloner { MONGO_DISALLOW_COPYING(AutoGetActiveCloner); public: - AutoGetActiveCloner(OperationContext* txn, const MigrationSessionId& migrationSessionId) - : _scopedXact(txn, MODE_IS) { - ShardingState* const gss = ShardingState::get(txn); + AutoGetActiveCloner(OperationContext* opCtx, const MigrationSessionId& migrationSessionId) + : _scopedXact(opCtx, MODE_IS) { + ShardingState* const gss = ShardingState::get(opCtx); const auto nss = gss->getActiveDonateChunkNss(); uassert(ErrorCodes::NotYetInitialized, "No active migrations were found", nss); // Once the collection is locked, the migration status cannot change - _autoColl.emplace(txn, *nss, MODE_IS); + _autoColl.emplace(opCtx, *nss, MODE_IS); uassert(ErrorCodes::NamespaceNotFound, str::stream() << "Collection " << nss->ns() << " does not exist", _autoColl->getCollection()); - auto css = CollectionShardingState::get(txn, *nss); + auto css = CollectionShardingState::get(opCtx, *nss); uassert(ErrorCodes::IllegalOperation, str::stream() << "No active migrations were found for collection " << nss->ns(), css && css->getMigrationSourceManager()); @@ -143,7 +143,7 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string&, BSONObj& cmdObj, int options, @@ -159,7 +159,7 @@ public: int arrSizeAtPrevIteration = -1; while (!arrBuilder || arrBuilder->arrSize() > arrSizeAtPrevIteration) { - AutoGetActiveCloner autoCloner(txn, migrationSessionId); + AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); if (!arrBuilder) { arrBuilder.emplace(autoCloner.getCloner()->getCloneBatchBufferAllocationSize()); @@ -168,7 +168,7 @@ public: arrSizeAtPrevIteration = arrBuilder->arrSize(); uassertStatusOK(autoCloner.getCloner()->nextCloneBatch( - txn, autoCloner.getColl(), arrBuilder.get_ptr())); + opCtx, autoCloner.getColl(), arrBuilder.get_ptr())); } invariant(arrBuilder); @@ -207,7 +207,7 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string&, BSONObj& cmdObj, int options, @@ -216,9 +216,9 @@ public: const MigrationSessionId migrationSessionId( uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj))); - AutoGetActiveCloner autoCloner(txn, migrationSessionId); + AutoGetActiveCloner autoCloner(opCtx, migrationSessionId); - uassertStatusOK(autoCloner.getCloner()->nextModsBatch(txn, autoCloner.getDb(), &result)); + uassertStatusOK(autoCloner.getCloner()->nextModsBatch(opCtx, autoCloner.getDb(), &result)); return true; } diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 94188337f6b..193c237db5c 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -164,7 +164,7 @@ private: StaticCatalogClient() : ShardingCatalogClientMock(nullptr) {} StatusWith<repl::OpTimeWith<std::vector<ShardType>>> getAllShards( - OperationContext* txn, repl::ReadConcernLevel readConcern) override { + OperationContext* opCtx, repl::ReadConcernLevel readConcern) override { ShardType donorShard; donorShard.setName(kDonorConnStr.getSetName()); diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 77f72f637e8..cb46f30b056 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -114,7 +114,7 @@ bool isInRange(const BSONObj& obj, * * TODO: Could optimize this check out if sharding on _id. */ -bool willOverrideLocalId(OperationContext* txn, +bool willOverrideLocalId(OperationContext* opCtx, const string& ns, BSONObj min, BSONObj max, @@ -123,7 +123,7 @@ bool willOverrideLocalId(OperationContext* txn, BSONObj remoteDoc, BSONObj* localDoc) { *localDoc = BSONObj(); - if (Helpers::findById(txn, db, ns.c_str(), remoteDoc, *localDoc)) { + if (Helpers::findById(opCtx, db, ns.c_str(), remoteDoc, *localDoc)) { return !isInRange(*localDoc, min, max, shardKeyPattern); } @@ -134,14 +134,14 @@ bool willOverrideLocalId(OperationContext* txn, * Returns true if the majority of the nodes and the nodes corresponding to the given writeConcern * (if not empty) have applied till the specified lastOp. */ -bool opReplicatedEnough(OperationContext* txn, +bool opReplicatedEnough(OperationContext* opCtx, const repl::OpTime& lastOpApplied, const WriteConcernOptions& writeConcern) { WriteConcernOptions majorityWriteConcern; majorityWriteConcern.wTimeout = -1; majorityWriteConcern.wMode = WriteConcernOptions::kMajority; Status majorityStatus = repl::getGlobalReplicationCoordinator() - ->awaitReplication(txn, lastOpApplied, majorityWriteConcern) + ->awaitReplication(opCtx, lastOpApplied, majorityWriteConcern) .status; if (!writeConcern.shouldWaitForOtherNodes()) { @@ -153,7 +153,7 @@ bool opReplicatedEnough(OperationContext* txn, WriteConcernOptions userWriteConcern(writeConcern); userWriteConcern.wTimeout = -1; Status userStatus = repl::getGlobalReplicationCoordinator() - ->awaitReplication(txn, lastOpApplied, userWriteConcern) + ->awaitReplication(opCtx, lastOpApplied, userWriteConcern) .status; return majorityStatus.isOK() && userStatus.isOK(); @@ -429,7 +429,7 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, _isActiveCV.notify_all(); } -void MigrationDestinationManager::_migrateDriver(OperationContext* txn, +void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, @@ -447,7 +447,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, << epoch.toString() << " with session id " << *_sessionId; MoveTimingHelper timing( - txn, "to", _nss.ns(), min, max, 6 /* steps */, &_errmsg, ShardId(), ShardId()); + opCtx, "to", _nss.ns(), min, max, 6 /* steps */, &_errmsg, ShardId(), ShardId()); const auto initialState = getState(); @@ -463,7 +463,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // Just tests the connection conn->getLastError(); - DisableDocumentValidation validationDisabler(txn); + DisableDocumentValidation validationDisabler(opCtx); std::vector<BSONObj> indexSpecs; BSONObj idIndexSpec; @@ -483,8 +483,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, { // 0. copy system.namespaces entry if collection doesn't already exist - OldClientWriteContext ctx(txn, _nss.ns()); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, _nss)) { + OldClientWriteContext ctx(opCtx, _nss.ns()); + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { _errmsg = str::stream() << "Not primary during migration: " << _nss.ns() << ": checking if collection exists"; warning() << _errmsg; @@ -508,8 +508,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } } - WriteUnitOfWork wuow(txn); - Status status = userCreateNS(txn, db, _nss.ns(), options, true, idIndexSpec); + WriteUnitOfWork wuow(opCtx); + Status status = userCreateNS(opCtx, db, _nss.ns(), options, true, idIndexSpec); if (!status.isOK()) { warning() << "failed to create collection [" << _nss << "] " << " with options " << options << ": " << redact(status); @@ -521,11 +521,11 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, { // 1. copy indexes - ScopedTransaction scopedXact(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), _nss.db(), MODE_X); - OldClientContext ctx(txn, _nss.ns()); + ScopedTransaction scopedXact(opCtx, MODE_IX); + Lock::DBLock lk(opCtx->lockState(), _nss.db(), MODE_X); + OldClientContext ctx(opCtx, _nss.ns()); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(txn, _nss)) { + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) { _errmsg = str::stream() << "Not primary during migration: " << _nss.ns(); warning() << _errmsg; setState(FAIL); @@ -541,12 +541,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, return; } - MultiIndexBlock indexer(txn, collection); + MultiIndexBlock indexer(opCtx, collection); indexer.removeExistingIndexes(&indexSpecs); if (!indexSpecs.empty()) { // Only copy indexes if the collection does not have any documents. - if (collection->numRecords(txn) > 0) { + if (collection->numRecords(opCtx) > 0) { _errmsg = str::stream() << "aborting migration, shard is missing " << indexSpecs.size() << " indexes and " << "collection is not empty. Non-trivial " @@ -574,13 +574,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, return; } - WriteUnitOfWork wunit(txn); + WriteUnitOfWork wunit(opCtx); indexer.commit(); for (auto&& infoObj : indexInfoObjs.getValue()) { // make sure to create index on secondaries as well getGlobalServiceContext()->getOpObserver()->onCreateIndex( - txn, db->getSystemIndexesName(), infoObj, true /* fromMigrate */); + opCtx, db->getSystemIndexesName(), infoObj, true /* fromMigrate */); } wunit.commit(); @@ -605,13 +605,13 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, deleterOptions.onlyRemoveOrphanedDocs = true; deleterOptions.removeSaverReason = "preCleanup"; - if (!getDeleter()->deleteNow(txn, deleterOptions, &_errmsg)) { + if (!getDeleter()->deleteNow(opCtx, deleterOptions, &_errmsg)) { warning() << "Failed to queue delete for migrate abort: " << redact(_errmsg); setState(FAIL); return; } - Status status = _notePending(txn, _nss, min, max, epoch); + Status status = _notePending(opCtx, _nss, min, max, epoch); if (!status.isOK()) { _errmsg = status.reason(); setState(FAIL); @@ -646,7 +646,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, BSONObjIterator i(arr); while (i.more()) { - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); if (getState() == ABORT) { log() << "Migration aborted while copying documents"; @@ -655,10 +655,10 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, BSONObj docToClone = i.next().Obj(); { - OldClientWriteContext cx(txn, _nss.ns()); + OldClientWriteContext cx(opCtx, _nss.ns()); BSONObj localDoc; - if (willOverrideLocalId(txn, + if (willOverrideLocalId(opCtx, _nss.ns(), min, max, @@ -677,7 +677,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, uasserted(16976, errMsg); } - Helpers::upsert(txn, _nss.ns(), docToClone, true); + Helpers::upsert(opCtx, _nss.ns(), docToClone, true); } thisTime++; @@ -690,8 +690,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, if (writeConcern.shouldWaitForOtherNodes()) { repl::ReplicationCoordinator::StatusAndDuration replStatus = repl::getGlobalReplicationCoordinator()->awaitReplication( - txn, - repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(), + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), writeConcern); if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { warning() << "secondaryThrottle on, but doc insert timed out; " @@ -712,7 +712,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // If running on a replicated system, we'll need to flush the docs we cloned to the // secondaries - repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); const BSONObj xferModsRequest = createTransferModsRequest(_nss, *_sessionId); @@ -735,20 +735,20 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, break; } - _applyMigrateOp(txn, _nss.ns(), min, max, shardKeyPattern, res, &lastOpApplied); + _applyMigrateOp(opCtx, _nss.ns(), min, max, shardKeyPattern, res, &lastOpApplied); const int maxIterations = 3600 * 50; int i; for (i = 0; i < maxIterations; i++) { - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); if (getState() == ABORT) { log() << "Migration aborted while waiting for replication at catch up stage"; return; } - if (opReplicatedEnough(txn, lastOpApplied, writeConcern)) + if (opReplicatedEnough(opCtx, lastOpApplied, writeConcern)) break; if (i > 100) { @@ -776,7 +776,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // until we're ready. Timer t; while (t.minutes() < 600) { - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); if (getState() == ABORT) { log() << "Migration aborted while waiting for replication"; @@ -785,7 +785,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, log() << "Waiting for replication to catch up before entering critical section"; - if (_flushPendingWrites(txn, _nss.ns(), min, max, lastOpApplied, writeConcern)) { + if (_flushPendingWrites(opCtx, _nss.ns(), min, max, lastOpApplied, writeConcern)) { break; } @@ -806,7 +806,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, bool transferAfterCommit = false; while (getState() == STEADY || getState() == COMMIT_START) { - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); // Make sure we do at least one transfer after recv'ing the commit message. If we // aren't sure that at least one transfer happens *after* our state changes to @@ -826,7 +826,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, } if (res["size"].number() > 0 && - _applyMigrateOp(txn, _nss.ns(), min, max, shardKeyPattern, res, &lastOpApplied)) { + _applyMigrateOp(opCtx, _nss.ns(), min, max, shardKeyPattern, res, &lastOpApplied)) { continue; } @@ -839,7 +839,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, // 1) The from side has told us that it has locked writes (COMMIT_START) // 2) We've checked at least one more time for un-transmitted mods if (getState() == COMMIT_START && transferAfterCommit == true) { - if (_flushPendingWrites(txn, _nss.ns(), min, max, lastOpApplied, writeConcern)) { + if (_flushPendingWrites(opCtx, _nss.ns(), min, max, lastOpApplied, writeConcern)) { break; } } @@ -867,7 +867,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn, conn.done(); } -bool MigrationDestinationManager::_applyMigrateOp(OperationContext* txn, +bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, const string& ns, const BSONObj& min, const BSONObj& max, @@ -882,20 +882,20 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* txn, bool didAnything = false; if (xfer["deleted"].isABSONObj()) { - ScopedTransaction scopedXact(txn, MODE_IX); - Lock::DBLock dlk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); + ScopedTransaction scopedXact(opCtx, MODE_IX); + Lock::DBLock dlk(opCtx->lockState(), nsToDatabaseSubstring(ns), MODE_IX); Helpers::RemoveSaver rs("moveChunk", ns, "removedDuring"); BSONObjIterator i(xfer["deleted"].Obj()); // deleted documents while (i.more()) { - Lock::CollectionLock clk(txn->lockState(), ns, MODE_X); - OldClientContext ctx(txn, ns); + Lock::CollectionLock clk(opCtx->lockState(), ns, MODE_X); + OldClientContext ctx(opCtx, ns); BSONObj id = i.next().Obj(); // do not apply delete if doc does not belong to the chunk being migrated BSONObj fullObj; - if (Helpers::findById(txn, ctx.db(), ns.c_str(), id, fullObj)) { + if (Helpers::findById(opCtx, ctx.db(), ns.c_str(), id, fullObj)) { if (!isInRange(fullObj, min, max, shardKeyPattern)) { if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeOperation)) { invariant(0); @@ -908,7 +908,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* txn, rs.goingToDelete(fullObj); } - deleteObjects(txn, + deleteObjects(opCtx, ctx.db() ? ctx.db()->getCollection(ns) : nullptr, ns, id, @@ -917,7 +917,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* txn, false /* god */, true /* fromMigrate */); - *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + *lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); didAnything = true; } } @@ -925,7 +925,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* txn, if (xfer["reload"].isABSONObj()) { // modified documents (insert/update) BSONObjIterator i(xfer["reload"].Obj()); while (i.more()) { - OldClientWriteContext cx(txn, ns); + OldClientWriteContext cx(opCtx, ns); BSONObj updatedDoc = i.next().Obj(); @@ -939,7 +939,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* txn, BSONObj localDoc; if (willOverrideLocalId( - txn, ns, min, max, shardKeyPattern, cx.db(), updatedDoc, &localDoc)) { + opCtx, ns, min, max, shardKeyPattern, cx.db(), updatedDoc, &localDoc)) { string errMsg = str::stream() << "cannot migrate chunk, local document " << localDoc << " has same _id as reloaded remote document " << updatedDoc; @@ -951,9 +951,9 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* txn, } // We are in write lock here, so sure we aren't killing - Helpers::upsert(txn, ns, updatedDoc, true); + Helpers::upsert(opCtx, ns, updatedDoc, true); - *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(); + *lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); didAnything = true; } } @@ -961,13 +961,13 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* txn, return didAnything; } -bool MigrationDestinationManager::_flushPendingWrites(OperationContext* txn, +bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx, const std::string& ns, BSONObj min, BSONObj max, const repl::OpTime& lastOpApplied, const WriteConcernOptions& writeConcern) { - if (!opReplicatedEnough(txn, lastOpApplied, writeConcern)) { + if (!opReplicatedEnough(opCtx, lastOpApplied, writeConcern)) { repl::OpTime op(lastOpApplied); OCCASIONALLY log() << "migrate commit waiting for a majority of slaves for '" << ns << "' " << redact(min) << " -> " << redact(max) << " waiting for: " << op; @@ -979,11 +979,11 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* txn, { // Get global lock to wait for write to be commited to journal. - ScopedTransaction scopedXact(txn, MODE_S); - Lock::GlobalRead lk(txn->lockState()); + ScopedTransaction scopedXact(opCtx, MODE_S); + Lock::GlobalRead lk(opCtx->lockState()); // if durability is on, force a write to journal - if (getDur().commitNow(txn)) { + if (getDur().commitNow(opCtx)) { log() << "migrate commit flushed to journal for '" << ns << "' " << redact(min) << " -> " << redact(max); } @@ -992,15 +992,15 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* txn, return true; } -Status MigrationDestinationManager::_notePending(OperationContext* txn, +Status MigrationDestinationManager::_notePending(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& min, const BSONObj& max, const OID& epoch) { - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(txn, nss); + auto css = CollectionShardingState::get(opCtx, nss); auto metadata = css->getMetadata(); // This can currently happen because drops aren't synchronized with in-migrations. The idea @@ -1026,7 +1026,7 @@ Status MigrationDestinationManager::_notePending(OperationContext* txn, return Status::OK(); } -Status MigrationDestinationManager::_forgetPending(OperationContext* txn, +Status MigrationDestinationManager::_forgetPending(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& min, const BSONObj& max, @@ -1040,10 +1040,10 @@ Status MigrationDestinationManager::_forgetPending(OperationContext* txn, _chunkMarkedPending = false; } - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(txn, nss); + auto css = CollectionShardingState::get(opCtx, nss); auto metadata = css->getMetadata(); // This can currently happen because drops aren't synchronized with in-migrations. The idea diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 4106029c0f2..700e9284159 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -125,7 +125,7 @@ private: OID epoch, WriteConcernOptions writeConcern); - void _migrateDriver(OperationContext* txn, + void _migrateDriver(OperationContext* opCtx, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, @@ -133,7 +133,7 @@ private: const OID& epoch, const WriteConcernOptions& writeConcern); - bool _applyMigrateOp(OperationContext* txn, + bool _applyMigrateOp(OperationContext* opCtx, const std::string& ns, const BSONObj& min, const BSONObj& max, @@ -141,7 +141,7 @@ private: const BSONObj& xfer, repl::OpTime* lastOpApplied); - bool _flushPendingWrites(OperationContext* txn, + bool _flushPendingWrites(OperationContext* opCtx, const std::string& ns, BSONObj min, BSONObj max, @@ -158,7 +158,7 @@ private: * TODO: Because migrations may currently be active when a collection drops, an epoch is * necessary to ensure the pending metadata change is still applicable. */ - Status _notePending(OperationContext* txn, + Status _notePending(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& min, const BSONObj& max, @@ -174,7 +174,7 @@ private: * TODO: Because migrations may currently be active when a collection drops, an epoch is * necessary to ensure the pending metadata change is still applicable. */ - Status _forgetPending(OperationContext* txn, + Status _forgetPending(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& min, const BSONObj& max, diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp index 3d684fda290..d3d54f6710c 100644 --- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp +++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp @@ -85,13 +85,13 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const string&, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { - auto shardingState = ShardingState::get(txn); + auto shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); const ShardId toShard(cmdObj["toShardName"].String()); @@ -106,7 +106,7 @@ public: // consistent and predictable, generally we'd refresh anyway, and to be paranoid. ChunkVersion currentVersion; - Status status = shardingState->refreshMetadataNow(txn, nss, ¤tVersion); + Status status = shardingState->refreshMetadataNow(opCtx, nss, ¤tVersion); if (!status.isOK()) { errmsg = str::stream() << "cannot start receiving chunk " << redact(chunkRange.toString()) << causedBy(redact(status)); @@ -118,7 +118,7 @@ public: const auto secondaryThrottle = uassertStatusOK(MigrationSecondaryThrottleOptions::createFromCommand(cmdObj)); const auto writeConcern = uassertStatusOK( - ChunkMoveWriteConcernOptions::getEffectiveWriteConcern(txn, secondaryThrottle)); + ChunkMoveWriteConcernOptions::getEffectiveWriteConcern(opCtx, secondaryThrottle)); BSONObj shardKeyPattern = cmdObj["shardKeyPattern"].Obj().getOwned(); @@ -199,13 +199,13 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const string&, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { - ShardingState::get(txn)->migrationDestinationManager()->report(result); + ShardingState::get(opCtx)->migrationDestinationManager()->report(result); return true; } @@ -240,14 +240,14 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const string&, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { auto const sessionId = uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)); - auto mdm = ShardingState::get(txn)->migrationDestinationManager(); + auto mdm = ShardingState::get(opCtx)->migrationDestinationManager(); Status const status = mdm->startCommit(sessionId); mdm->report(result); if (!status.isOK()) { @@ -288,13 +288,13 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const string&, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result) { - auto const mdm = ShardingState::get(txn)->migrationDestinationManager(); + auto const mdm = ShardingState::get(opCtx)->migrationDestinationManager(); auto migrationSessionIdStatus(MigrationSessionId::extractFromBSON(cmdObj)); diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 1de1af92316..5fb64445a75 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -76,7 +76,7 @@ MONGO_FP_DECLARE(migrationCommitNetworkError); MONGO_FP_DECLARE(failMigrationCommit); MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection); -MigrationSourceManager::MigrationSourceManager(OperationContext* txn, +MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, MoveChunkRequest request, ConnectionString donorConnStr, HostAndPort recipientHost) @@ -84,7 +84,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn, _donorConnStr(std::move(donorConnStr)), _recipientHost(std::move(recipientHost)), _startTime() { - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); // Disallow moving a chunk to ourselves uassert(ErrorCodes::InvalidOptions, @@ -95,11 +95,11 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn, << " with expected collection version epoch" << _args.getVersionEpoch(); // Now that the collection is locked, snapshot the metadata and fetch the latest versions - ShardingState* const shardingState = ShardingState::get(txn); + ShardingState* const shardingState = ShardingState::get(opCtx); ChunkVersion shardVersion; - Status refreshStatus = shardingState->refreshMetadataNow(txn, getNss(), &shardVersion); + Status refreshStatus = shardingState->refreshMetadataNow(opCtx, getNss(), &shardVersion); if (!refreshStatus.isOK()) { uasserted(refreshStatus.code(), str::stream() << "cannot start migrate of chunk " << _args.toString() @@ -117,10 +117,10 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn, // Snapshot the committed metadata from the time the migration starts { - ScopedTransaction scopedXact(txn, MODE_IS); - AutoGetCollection autoColl(txn, getNss(), MODE_IS); + ScopedTransaction scopedXact(opCtx, MODE_IS); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); - _collectionMetadata = CollectionShardingState::get(txn, getNss())->getMetadata(); + _collectionMetadata = CollectionShardingState::get(opCtx, getNss())->getMetadata(); _keyPattern = _collectionMetadata->getKeyPattern(); } @@ -163,34 +163,34 @@ NamespaceString MigrationSourceManager::getNss() const { return _args.getNss(); } -Status MigrationSourceManager::startClone(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +Status MigrationSourceManager::startClone(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); invariant(_state == kCreated); - auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); }); - - grid.catalogClient(txn)->logChange(txn, - "moveChunk.start", - getNss().ns(), - BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() - << "from" - << _args.getFromShardId() - << "to" - << _args.getToShardId()), - ShardingCatalogClient::kMajorityWriteConcern); + auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); + + grid.catalogClient(opCtx)->logChange( + opCtx, + "moveChunk.start", + getNss().ns(), + BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" + << _args.getFromShardId() + << "to" + << _args.getToShardId()), + ShardingCatalogClient::kMajorityWriteConcern); _cloneDriver = stdx::make_unique<MigrationChunkClonerSourceLegacy>( _args, _collectionMetadata->getKeyPattern(), _donorConnStr, _recipientHost); { // Register for notifications from the replication subsystem - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); - auto css = CollectionShardingState::get(txn, getNss().ns()); - css->setMigrationSourceManager(txn, this); + auto css = CollectionShardingState::get(opCtx, getNss().ns()); + css->setMigrationSourceManager(opCtx, this); } - Status startCloneStatus = _cloneDriver->startClone(txn); + Status startCloneStatus = _cloneDriver->startClone(opCtx); if (!startCloneStatus.isOK()) { return startCloneStatus; } @@ -200,14 +200,14 @@ Status MigrationSourceManager::startClone(OperationContext* txn) { return Status::OK(); } -Status MigrationSourceManager::awaitToCatchUp(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +Status MigrationSourceManager::awaitToCatchUp(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); invariant(_state == kCloning); - auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); }); + auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); // Block until the cloner deems it appropriate to enter the critical section. Status catchUpStatus = _cloneDriver->awaitUntilCriticalSectionIsAppropriate( - txn, kMaxWaitToEnterCriticalSectionTimeout); + opCtx, kMaxWaitToEnterCriticalSectionTimeout); if (!catchUpStatus.isOK()) { return catchUpStatus; } @@ -217,13 +217,13 @@ Status MigrationSourceManager::awaitToCatchUp(OperationContext* txn) { return Status::OK(); } -Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); invariant(_state == kCloneCaughtUp); - auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); }); + auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); // Mark the shard as running critical operation, which requires recovery on crash - Status status = ShardingStateRecovery::startMetadataOp(txn); + Status status = ShardingStateRecovery::startMetadataOp(opCtx); if (!status.isOK()) { return status; } @@ -232,11 +232,11 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) { // The critical section must be entered with collection X lock in order to ensure there are // no writes which could have entered and passed the version check just before we entered // the crticial section, but managed to complete after we left it. - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); // Check that the collection has not been dropped or recreated since the migration began. - auto css = CollectionShardingState::get(txn, getNss().ns()); + auto css = CollectionShardingState::get(opCtx, getNss().ns()); auto metadata = css->getMetadata(); if (!metadata || (metadata->getCollVersion().epoch() != _collectionMetadata->getCollVersion().epoch())) { @@ -261,13 +261,13 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) { return Status::OK(); } -Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); invariant(_state == kCriticalSection); - auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); }); + auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); // Tell the recipient shard to fetch the latest changes. - Status commitCloneStatus = _cloneDriver->commitClone(txn); + Status commitCloneStatus = _cloneDriver->commitClone(opCtx); if (MONGO_FAIL_POINT(failMigrationCommit) && commitCloneStatus.isOK()) { commitCloneStatus = {ErrorCodes::InternalError, @@ -284,10 +284,10 @@ Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* txn) { return Status::OK(); } -Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); invariant(_state == kCloneCompleted); - auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); }); + auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); ChunkType migratedChunkType; migratedChunkType.setMin(_args.getMinKey()); @@ -319,7 +319,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn auto commitChunkMigrationResponse = grid.shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( - txn, + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", builder.obj(), @@ -342,8 +342,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn "against the config server to obtain its latest optime" << causedBy(redact(migrationCommitStatus)); - Status status = grid.catalogClient(txn)->logChange( - txn, + Status status = grid.catalogClient(opCtx)->logChange( + opCtx, "moveChunk.validating", getNss().ns(), BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" @@ -376,13 +376,13 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn // up so that subsequent requests will try to do a full refresh. ChunkVersion unusedShardVersion; Status refreshStatus = - ShardingState::get(txn)->refreshMetadataNow(txn, getNss(), &unusedShardVersion); + ShardingState::get(opCtx)->refreshMetadataNow(opCtx, getNss(), &unusedShardVersion); if (refreshStatus.isOK()) { - ScopedTransaction scopedXact(txn, MODE_IS); - AutoGetCollection autoColl(txn, getNss(), MODE_IS); + ScopedTransaction scopedXact(opCtx, MODE_IS); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); - auto refreshedMetadata = CollectionShardingState::get(txn, getNss())->getMetadata(); + auto refreshedMetadata = CollectionShardingState::get(opCtx, getNss())->getMetadata(); if (!refreshedMetadata) { return {ErrorCodes::NamespaceNotSharded, @@ -402,10 +402,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn log() << "Migration succeeded and updated collection version to " << refreshedMetadata->getCollVersion(); } else { - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); - CollectionShardingState::get(txn, getNss())->refreshMetadata(txn, nullptr); + CollectionShardingState::get(opCtx, getNss())->refreshMetadata(opCtx, nullptr); log() << "Failed to refresh metadata after a failed commit attempt. Metadata was cleared " "so it will get a full refresh when accessed again" @@ -420,52 +420,52 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeLeavingCriticalSection); scopedGuard.Dismiss(); - _cleanup(txn); - - grid.catalogClient(txn)->logChange(txn, - "moveChunk.commit", - getNss().ns(), - BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() - << "from" - << _args.getFromShardId() - << "to" - << _args.getToShardId()), - ShardingCatalogClient::kMajorityWriteConcern); + _cleanup(opCtx); + + grid.catalogClient(opCtx)->logChange( + opCtx, + "moveChunk.commit", + getNss().ns(), + BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" + << _args.getFromShardId() + << "to" + << _args.getToShardId()), + ShardingCatalogClient::kMajorityWriteConcern); return Status::OK(); } -void MigrationSourceManager::cleanupOnError(OperationContext* txn) { +void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) { if (_state == kDone) { return; } - grid.catalogClient(txn)->logChange(txn, - "moveChunk.error", - getNss().ns(), - BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() - << "from" - << _args.getFromShardId() - << "to" - << _args.getToShardId()), - ShardingCatalogClient::kMajorityWriteConcern); - - _cleanup(txn); + grid.catalogClient(opCtx)->logChange( + opCtx, + "moveChunk.error", + getNss().ns(), + BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" + << _args.getFromShardId() + << "to" + << _args.getToShardId()), + ShardingCatalogClient::kMajorityWriteConcern); + + _cleanup(opCtx); } -void MigrationSourceManager::_cleanup(OperationContext* txn) { +void MigrationSourceManager::_cleanup(OperationContext* opCtx) { invariant(_state != kDone); auto cloneDriver = [&]() { // Unregister from the collection's sharding state - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); - auto css = CollectionShardingState::get(txn, getNss().ns()); + auto css = CollectionShardingState::get(opCtx, getNss().ns()); // The migration source manager is not visible anymore after it is unregistered from the // collection - css->clearMigrationSourceManager(txn); + css->clearMigrationSourceManager(opCtx); // Leave the critical section. if (_critSecSignal) { @@ -478,11 +478,11 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) { // Decrement the metadata op counter outside of the collection lock in order to hold it for as // short as possible. if (_state == kCriticalSection || _state == kCloneCompleted) { - ShardingStateRecovery::endMetadataOp(txn); + ShardingStateRecovery::endMetadataOp(opCtx); } if (cloneDriver) { - cloneDriver->cancelClone(txn); + cloneDriver->cancelClone(opCtx); } _state = kDone; diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index cb5ce4be792..c0822ca798a 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -83,7 +83,7 @@ public: * - SendStaleConfigException if the expected collection version does not match what we find it * to be after acquiring the distributed lock. */ - MigrationSourceManager(OperationContext* txn, + MigrationSourceManager(OperationContext* opCtx, MoveChunkRequest request, ConnectionString donorConnStr, HostAndPort recipientHost); @@ -101,7 +101,7 @@ public: * Expected state: kCreated * Resulting state: kCloning on success, kDone on failure */ - Status startClone(OperationContext* txn); + Status startClone(OperationContext* opCtx); /** * Waits for the cloning to catch up sufficiently so we won't have to stay in the critical @@ -111,7 +111,7 @@ public: * Expected state: kCloning * Resulting state: kCloneCaughtUp on success, kDone on failure */ - Status awaitToCatchUp(OperationContext* txn); + Status awaitToCatchUp(OperationContext* opCtx); /** * Waits for the active clone operation to catch up and enters critical section. Once this call @@ -122,7 +122,7 @@ public: * Expected state: kCloneCaughtUp * Resulting state: kCriticalSection on success, kDone on failure */ - Status enterCriticalSection(OperationContext* txn); + Status enterCriticalSection(OperationContext* opCtx); /** * Tells the recipient of the chunk to commit the chunk contents, which it received. @@ -130,7 +130,7 @@ public: * Expected state: kCriticalSection * Resulting state: kCloneCompleted on success, kDone on failure */ - Status commitChunkOnRecipient(OperationContext* txn); + Status commitChunkOnRecipient(OperationContext* opCtx); /** * Tells the recipient shard to fetch the latest portion of data from the donor and to commit it @@ -144,7 +144,7 @@ public: * Expected state: kCloneCompleted * Resulting state: kDone */ - Status commitChunkMetadataOnConfig(OperationContext* txn); + Status commitChunkMetadataOnConfig(OperationContext* opCtx); /** * May be called at any time. Unregisters the migration source manager from the collection, @@ -154,7 +154,7 @@ public: * Expected state: Any * Resulting state: kDone */ - void cleanupOnError(OperationContext* txn); + void cleanupOnError(OperationContext* opCtx); /** * Returns the key pattern object for the stored committed metadata. @@ -200,7 +200,7 @@ private: * Called when any of the states fails. May only be called once and will put the migration * manager into the kDone state. */ - void _cleanup(OperationContext* txn); + void _cleanup(OperationContext* opCtx); // The parameters to the moveChunk command const MoveChunkRequest _args; diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp index 03a03c285bb..08ea3723920 100644 --- a/src/mongo/db/s/move_chunk_command.cpp +++ b/src/mongo/db/s/move_chunk_command.cpp @@ -110,13 +110,13 @@ public: return parseNsFullyQualified(dbname, cmdObj); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const string& dbname, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result) override { - auto shardingState = ShardingState::get(txn); + auto shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); const MoveChunkRequest moveChunkRequest = uassertStatusOK( @@ -124,7 +124,7 @@ public: // Make sure we're as up-to-date as possible with shard information. This catches the case // where we might have changed a shard's host by removing/adding a shard with the same name. - grid.shardRegistry()->reload(txn); + grid.shardRegistry()->reload(opCtx); auto scopedRegisterMigration = uassertStatusOK(shardingState->registerDonateChunk(moveChunkRequest)); @@ -134,7 +134,7 @@ public: // Check if there is an existing migration running and if so, join it if (scopedRegisterMigration.mustExecute()) { try { - _runImpl(txn, moveChunkRequest); + _runImpl(opCtx, moveChunkRequest); status = Status::OK(); } catch (const DBException& e) { status = e.toStatus(); @@ -148,7 +148,7 @@ public: scopedRegisterMigration.complete(status); } else { - status = scopedRegisterMigration.waitForCompletion(txn); + status = scopedRegisterMigration.waitForCompletion(opCtx); } if (status == ErrorCodes::ChunkTooBig) { @@ -165,27 +165,27 @@ public: } private: - static void _runImpl(OperationContext* txn, const MoveChunkRequest& moveChunkRequest) { + static void _runImpl(OperationContext* opCtx, const MoveChunkRequest& moveChunkRequest) { const auto writeConcernForRangeDeleter = uassertStatusOK(ChunkMoveWriteConcernOptions::getEffectiveWriteConcern( - txn, moveChunkRequest.getSecondaryThrottle())); + opCtx, moveChunkRequest.getSecondaryThrottle())); // Resolve the donor and recipient shards and their connection string - auto const shardRegistry = Grid::get(txn)->shardRegistry(); + auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto donorConnStr = - uassertStatusOK(shardRegistry->getShard(txn, moveChunkRequest.getFromShardId())) + uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getFromShardId())) ->getConnString(); const auto recipientHost = uassertStatusOK([&] { auto recipientShard = - uassertStatusOK(shardRegistry->getShard(txn, moveChunkRequest.getToShardId())); + uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getToShardId())); return recipientShard->getTargeter()->findHostNoWait( ReadPreferenceSetting{ReadPreference::PrimaryOnly}); }()); string unusedErrMsg; - MoveTimingHelper moveTimingHelper(txn, + MoveTimingHelper moveTimingHelper(opCtx, "from", moveChunkRequest.getNss().ns(), moveChunkRequest.getMinKey(), @@ -202,27 +202,27 @@ private: { MigrationSourceManager migrationSourceManager( - txn, moveChunkRequest, donorConnStr, recipientHost); + opCtx, moveChunkRequest, donorConnStr, recipientHost); shardKeyPattern = migrationSourceManager.getKeyPattern().getOwned(); moveTimingHelper.done(2); MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep2); - uassertStatusOKWithWarning(migrationSourceManager.startClone(txn)); + uassertStatusOKWithWarning(migrationSourceManager.startClone(opCtx)); moveTimingHelper.done(3); MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep3); - uassertStatusOKWithWarning(migrationSourceManager.awaitToCatchUp(txn)); + uassertStatusOKWithWarning(migrationSourceManager.awaitToCatchUp(opCtx)); moveTimingHelper.done(4); MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep4); - uassertStatusOKWithWarning(migrationSourceManager.enterCriticalSection(txn)); - uassertStatusOKWithWarning(migrationSourceManager.commitChunkOnRecipient(txn)); + uassertStatusOKWithWarning(migrationSourceManager.enterCriticalSection(opCtx)); + uassertStatusOKWithWarning(migrationSourceManager.commitChunkOnRecipient(opCtx)); moveTimingHelper.done(5); MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep5); - uassertStatusOKWithWarning(migrationSourceManager.commitChunkMetadataOnConfig(txn)); + uassertStatusOKWithWarning(migrationSourceManager.commitChunkMetadataOnConfig(opCtx)); moveTimingHelper.done(6); MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep6); } @@ -245,14 +245,14 @@ private: // This is an immediate delete, and as a consequence, there could be more // deletes happening simultaneously than there are deleter worker threads. - if (!getDeleter()->deleteNow(txn, deleterOptions, &errMsg)) { + if (!getDeleter()->deleteNow(opCtx, deleterOptions, &errMsg)) { log() << "Error occured while performing cleanup: " << redact(errMsg); } } else { log() << "forking for cleanup of chunk data"; string errMsg; - if (!getDeleter()->queueDelete(txn, + if (!getDeleter()->queueDelete(opCtx, deleterOptions, NULL, // Don't want to be notified &errMsg)) { diff --git a/src/mongo/db/s/move_timing_helper.cpp b/src/mongo/db/s/move_timing_helper.cpp index 222a5383002..89c305cda43 100644 --- a/src/mongo/db/s/move_timing_helper.cpp +++ b/src/mongo/db/s/move_timing_helper.cpp @@ -39,7 +39,7 @@ namespace mongo { -MoveTimingHelper::MoveTimingHelper(OperationContext* txn, +MoveTimingHelper::MoveTimingHelper(OperationContext* opCtx, const std::string& where, const std::string& ns, const BSONObj& min, @@ -48,7 +48,7 @@ MoveTimingHelper::MoveTimingHelper(OperationContext* txn, std::string* cmdErrmsg, const ShardId& toShard, const ShardId& fromShard) - : _txn(txn), + : _opCtx(opCtx), _where(where), _ns(ns), _to(toShard), @@ -82,11 +82,11 @@ MoveTimingHelper::~MoveTimingHelper() { _b.append("errmsg", *_cmdErrmsg); } - grid.catalogClient(_txn)->logChange(_txn, - str::stream() << "moveChunk." << _where, - _ns, - _b.obj(), - ShardingCatalogClient::kMajorityWriteConcern); + grid.catalogClient(_opCtx)->logChange(_opCtx, + str::stream() << "moveChunk." << _where, + _ns, + _b.obj(), + ShardingCatalogClient::kMajorityWriteConcern); } catch (const std::exception& e) { warning() << "couldn't record timing for moveChunk '" << _where << "': " << redact(e.what()); @@ -99,10 +99,10 @@ void MoveTimingHelper::done(int step) { const std::string s = str::stream() << "step " << step << " of " << _totalNumSteps; - CurOp* op = CurOp::get(_txn); + CurOp* op = CurOp::get(_opCtx); { - stdx::lock_guard<Client> lk(*_txn->getClient()); + stdx::lock_guard<Client> lk(*_opCtx->getClient()); op->setMessage_inlock(s.c_str()); } diff --git a/src/mongo/db/s/move_timing_helper.h b/src/mongo/db/s/move_timing_helper.h index bc1f2644ac7..eb8194f1ae6 100644 --- a/src/mongo/db/s/move_timing_helper.h +++ b/src/mongo/db/s/move_timing_helper.h @@ -41,7 +41,7 @@ class OperationContext; class MoveTimingHelper { public: - MoveTimingHelper(OperationContext* txn, + MoveTimingHelper(OperationContext* opCtx, const std::string& where, const std::string& ns, const BSONObj& min, @@ -58,7 +58,7 @@ private: // Measures how long the receiving of a chunk takes Timer _t; - OperationContext* const _txn; + OperationContext* const _opCtx; const std::string _where; const std::string _ns; const ShardId _to; diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 0f92bbd5492..fc13bf41287 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -46,8 +46,8 @@ const Microseconds kMaxWaitForMigrationCriticalSection = Minutes(5); OperationShardingState::OperationShardingState() = default; -OperationShardingState& OperationShardingState::get(OperationContext* txn) { - return shardingMetadataDecoration(txn); +OperationShardingState& OperationShardingState::get(OperationContext* opCtx) { + return shardingMetadataDecoration(opCtx); } void OperationShardingState::initializeShardVersion(NamespaceString nss, @@ -101,15 +101,15 @@ void OperationShardingState::unsetShardVersion(NamespaceString nss) { _clear(); } -bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationContext* txn) { +bool OperationShardingState::waitForMigrationCriticalSectionSignal(OperationContext* opCtx) { // Must not block while holding a lock - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); if (_migrationCriticalSectionSignal) { _migrationCriticalSectionSignal->waitFor( - txn, - txn->hasDeadline() - ? std::min(txn->getRemainingMaxTimeMicros(), kMaxWaitForMigrationCriticalSection) + opCtx, + opCtx->hasDeadline() + ? std::min(opCtx->getRemainingMaxTimeMicros(), kMaxWaitForMigrationCriticalSection) : kMaxWaitForMigrationCriticalSection); _migrationCriticalSectionSignal = nullptr; return true; @@ -130,10 +130,10 @@ void OperationShardingState::_clear() { _ns = NamespaceString(); } -OperationShardingState::IgnoreVersioningBlock::IgnoreVersioningBlock(OperationContext* txn, +OperationShardingState::IgnoreVersioningBlock::IgnoreVersioningBlock(OperationContext* opCtx, const NamespaceString& ns) - : _txn(txn), _ns(ns) { - auto& oss = OperationShardingState::get(txn); + : _opCtx(opCtx), _ns(ns) { + auto& oss = OperationShardingState::get(opCtx); _hadOriginalVersion = oss._hasVersion; if (_hadOriginalVersion) { _originalVersion = oss.getShardVersion(ns); @@ -142,7 +142,7 @@ OperationShardingState::IgnoreVersioningBlock::IgnoreVersioningBlock(OperationCo } OperationShardingState::IgnoreVersioningBlock::~IgnoreVersioningBlock() { - auto& oss = OperationShardingState::get(_txn); + auto& oss = OperationShardingState::get(_opCtx); invariant(ChunkVersion::isIgnoredVersion(oss.getShardVersion(_ns))); if (_hadOriginalVersion) { oss.setShardVersion(_ns, _originalVersion); diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index aa03834da6a..d4a0e778af1 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -56,9 +56,9 @@ public: OperationShardingState(); /** - * Retrieves a reference to the shard version decorating the OperationContext, 'txn'. + * Retrieves a reference to the shard version decorating the OperationContext, 'opCtx'. */ - static OperationShardingState& get(OperationContext* txn); + static OperationShardingState& get(OperationContext* opCtx); /** * Parses shard version from the command parameters 'cmdObj' and stores the results in this @@ -104,7 +104,7 @@ public: * Returns true if the call actually waited because of migration critical section (regardless if * whether it timed out or not), false if there was no active migration critical section. */ - bool waitForMigrationCriticalSectionSignal(OperationContext* txn); + bool waitForMigrationCriticalSectionSignal(OperationContext* opCtx); /** * Setting this value indicates that when the version check failed, there was an active @@ -140,11 +140,11 @@ class OperationShardingState::IgnoreVersioningBlock { MONGO_DISALLOW_COPYING(IgnoreVersioningBlock); public: - IgnoreVersioningBlock(OperationContext* txn, const NamespaceString& ns); + IgnoreVersioningBlock(OperationContext* opCtx, const NamespaceString& ns); ~IgnoreVersioningBlock(); private: - OperationContext* _txn; + OperationContext* _opCtx; NamespaceString _ns; ChunkVersion _originalVersion; bool _hadOriginalVersion; diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index d4cc01f1d06..a0ed2ca7e6b 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -88,13 +88,13 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string&, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result) { - auto shardingState = ShardingState::get(txn); + auto shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); // Steps @@ -128,7 +128,7 @@ public: // Step 1 - Client* client = txn->getClient(); + Client* client = opCtx->getClient(); LastError::get(client).disable(); const bool authoritative = cmdObj.getBoolField("authoritative"); @@ -156,7 +156,7 @@ public: // Validate shardName parameter. string shardName = cmdObj["shard"].str(); - auto storedShardName = ShardingState::get(txn)->getShardName(); + auto storedShardName = ShardingState::get(opCtx)->getShardName(); uassert(ErrorCodes::BadValue, str::stream() << "received shardName " << shardName << " which differs from stored shardName " @@ -180,7 +180,7 @@ public: return false; } - ConnectionString storedConnStr = ShardingState::get(txn)->getConfigServer(txn); + ConnectionString storedConnStr = ShardingState::get(opCtx)->getConfigServer(opCtx); if (givenConnStr.getSetName() != storedConnStr.getSetName()) { errmsg = str::stream() << "given config server set name: " << givenConnStr.getSetName() @@ -215,10 +215,10 @@ public: { boost::optional<AutoGetDb> autoDb; - autoDb.emplace(txn, nss.db(), MODE_IS); + autoDb.emplace(opCtx, nss.db(), MODE_IS); // we can run on a slave up to here - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(txn, + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(opCtx, nss.db())) { result.append("errmsg", "not master"); result.append("note", "from post init in setShardVersion"); @@ -227,14 +227,14 @@ public: // Views do not require a shard version check. if (autoDb->getDb() && !autoDb->getDb()->getCollection(nss.ns()) && - autoDb->getDb()->getViewCatalog()->lookup(txn, nss.ns())) { + autoDb->getDb()->getViewCatalog()->lookup(opCtx, nss.ns())) { return true; } boost::optional<Lock::CollectionLock> collLock; - collLock.emplace(txn->lockState(), nss.ns(), MODE_IS); + collLock.emplace(opCtx->lockState(), nss.ns(), MODE_IS); - auto css = CollectionShardingState::get(txn, nss); + auto css = CollectionShardingState::get(opCtx, nss); const ChunkVersion collectionShardVersion = (css->getMetadata() ? css->getMetadata()->getShardVersion() : ChunkVersion::UNSHARDED()); @@ -306,7 +306,7 @@ public: collLock.reset(); autoDb.reset(); log() << "waiting till out of critical section"; - critSecSignal->waitFor(txn, Seconds(10)); + critSecSignal->waitFor(opCtx, Seconds(10)); } } @@ -329,7 +329,7 @@ public: collLock.reset(); autoDb.reset(); log() << "waiting till out of critical section"; - critSecSignal->waitFor(txn, Seconds(10)); + critSecSignal->waitFor(opCtx, Seconds(10)); } } @@ -346,13 +346,13 @@ public: // Step 7 - Status status = shardingState->onStaleShardVersion(txn, nss, requestedVersion); + Status status = shardingState->onStaleShardVersion(opCtx, nss, requestedVersion); { - AutoGetCollection autoColl(txn, nss, MODE_IS); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); ChunkVersion currVersion = ChunkVersion::UNSHARDED(); - auto collMetadata = CollectionShardingState::get(txn, nss)->getMetadata(); + auto collMetadata = CollectionShardingState::get(opCtx, nss)->getMetadata(); if (collMetadata) { currVersion = collMetadata->getShardVersion(); } diff --git a/src/mongo/db/s/shard_identity_rollback_notifier.cpp b/src/mongo/db/s/shard_identity_rollback_notifier.cpp index b78efaa6fef..118cdb038b6 100644 --- a/src/mongo/db/s/shard_identity_rollback_notifier.cpp +++ b/src/mongo/db/s/shard_identity_rollback_notifier.cpp @@ -40,12 +40,12 @@ const auto getRollbackNotifier = ServiceContext::declareDecoration<ShardIdentity ShardIdentityRollbackNotifier::ShardIdentityRollbackNotifier() = default; -ShardIdentityRollbackNotifier* ShardIdentityRollbackNotifier::get(OperationContext* txn) { - return get(txn->getServiceContext()); +ShardIdentityRollbackNotifier* ShardIdentityRollbackNotifier::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); } -ShardIdentityRollbackNotifier* ShardIdentityRollbackNotifier::get(ServiceContext* txn) { - return &getRollbackNotifier(txn); +ShardIdentityRollbackNotifier* ShardIdentityRollbackNotifier::get(ServiceContext* opCtx) { + return &getRollbackNotifier(opCtx); } diff --git a/src/mongo/db/s/shard_identity_rollback_notifier.h b/src/mongo/db/s/shard_identity_rollback_notifier.h index a8bb6592350..4ce184065a2 100644 --- a/src/mongo/db/s/shard_identity_rollback_notifier.h +++ b/src/mongo/db/s/shard_identity_rollback_notifier.h @@ -59,8 +59,8 @@ public: /** * Retrieves the ShardIdentityRollbackNotifier associated with the specified service context. */ - static ShardIdentityRollbackNotifier* get(OperationContext* txn); - static ShardIdentityRollbackNotifier* get(ServiceContext* txn); + static ShardIdentityRollbackNotifier* get(OperationContext* opCtx); + static ShardIdentityRollbackNotifier* get(ServiceContext* opCtx); /** * Records the fact that the shardIdentity document was rolled back. diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index ba292c269c2..b81af5a051a 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -49,7 +49,7 @@ namespace mongo { -Status initializeGlobalShardingStateForMongod(OperationContext* txn, +Status initializeGlobalShardingStateForMongod(OperationContext* opCtx, const ConnectionString& configCS, StringData distLockProcessId) { auto targeterFactory = stdx::make_unique<RemoteCommandTargeterFactoryImpl>(); @@ -82,7 +82,7 @@ Status initializeGlobalShardingStateForMongod(OperationContext* txn, stdx::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); return initializeGlobalShardingState( - txn, + opCtx, configCS, distLockProcessId, std::move(shardFactory), diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h index faf24aededd..cf714002921 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.h +++ b/src/mongo/db/s/sharding_initialization_mongod.h @@ -42,7 +42,7 @@ class Status; * * NOTE: This does not initialize ShardingState, which should only be done for shard servers. */ -Status initializeGlobalShardingStateForMongod(OperationContext* txn, +Status initializeGlobalShardingStateForMongod(OperationContext* opCtx, const ConnectionString& configCS, StringData distLockProcessId); diff --git a/src/mongo/db/s/sharding_server_status.cpp b/src/mongo/db/s/sharding_server_status.cpp index 8f58e24600b..80fd5f12566 100644 --- a/src/mongo/db/s/sharding_server_status.cpp +++ b/src/mongo/db/s/sharding_server_status.cpp @@ -45,21 +45,22 @@ public: return true; } - BSONObj generateSection(OperationContext* txn, const BSONElement& configElement) const final { + BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const final { BSONObjBuilder result; - auto shardingState = ShardingState::get(txn); + auto shardingState = ShardingState::get(opCtx); if (shardingState->enabled() && serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { result.append("configsvrConnectionString", - shardingState->getConfigServer(txn).toString()); + shardingState->getConfigServer(opCtx).toString()); - Grid::get(txn)->configOpTime().append(&result, "lastSeenConfigServerOpTime"); + Grid::get(opCtx)->configOpTime().append(&result, "lastSeenConfigServerOpTime"); // Get a migration status report if a migration is active for which this is the source // shard. ShardingState::getActiveMigrationStatusReport will take an IS lock on the // namespace of the active migration if there is one that is active. - BSONObj migrationStatus = ShardingState::get(txn)->getActiveMigrationStatusReport(txn); + BSONObj migrationStatus = + ShardingState::get(opCtx)->getActiveMigrationStatusReport(opCtx); if (!migrationStatus.isEmpty()) { result.append("migrations", migrationStatus); } diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 57fda772fb4..567877c216a 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -119,17 +119,17 @@ void updateShardIdentityConfigStringCB(const string& setName, const string& newC } } -bool haveLocalShardingInfo(OperationContext* txn, const string& ns) { - if (!ShardingState::get(txn)->enabled()) { +bool haveLocalShardingInfo(OperationContext* opCtx, const string& ns) { + if (!ShardingState::get(opCtx)->enabled()) { return false; } - const auto& oss = OperationShardingState::get(txn); + const auto& oss = OperationShardingState::get(opCtx); if (oss.hasShardVersion()) { return true; } - const auto& sci = ShardedConnectionInfo::get(txn->getClient(), false); + const auto& sci = ShardedConnectionInfo::get(opCtx->getClient(), false); if (sci && !sci->getVersion(ns).isStrictlyEqualTo(ChunkVersion::UNSHARDED())) { return true; } @@ -179,10 +179,10 @@ Status ShardingState::canAcceptShardedCommands() const { } } -ConnectionString ShardingState::getConfigServer(OperationContext* txn) { +ConnectionString ShardingState::getConfigServer(OperationContext* opCtx) { invariant(enabled()); stdx::lock_guard<stdx::mutex> lk(_mutex); - return Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(); + return Grid::get(opCtx)->shardRegistry()->getConfigServerConnectionString(); } string ShardingState::getShardName() { @@ -191,23 +191,23 @@ string ShardingState::getShardName() { return _shardName; } -void ShardingState::shutDown(OperationContext* txn) { +void ShardingState::shutDown(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lk(_mutex); if (enabled()) { grid.getExecutorPool()->shutdownAndJoin(); - grid.catalogClient(txn)->shutDown(txn); + grid.catalogClient(opCtx)->shutDown(opCtx); } } -Status ShardingState::updateConfigServerOpTimeFromMetadata(OperationContext* txn) { +Status ShardingState::updateConfigServerOpTimeFromMetadata(OperationContext* opCtx) { if (!enabled()) { // Nothing to do if sharding state has not been initialized. return Status::OK(); } - boost::optional<repl::OpTime> opTime = rpc::ConfigServerMetadata::get(txn).getOpTime(); + boost::optional<repl::OpTime> opTime = rpc::ConfigServerMetadata::get(opCtx).getOpTime(); if (opTime) { - if (!AuthorizationSession::get(txn->getClient()) + if (!AuthorizationSession::get(opCtx->getClient()) ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), ActionType::internal)) { return Status(ErrorCodes::Unauthorized, "Unauthorized to update config opTime"); @@ -219,14 +219,14 @@ Status ShardingState::updateConfigServerOpTimeFromMetadata(OperationContext* txn return Status::OK(); } -CollectionShardingState* ShardingState::getNS(const std::string& ns, OperationContext* txn) { +CollectionShardingState* ShardingState::getNS(const std::string& ns, OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lk(_mutex); CollectionShardingStateMap::iterator it = _collections.find(ns); if (it == _collections.end()) { auto inserted = _collections.insert(make_pair(ns, stdx::make_unique<CollectionShardingState>( - txn->getServiceContext(), NamespaceString(ns)))); + opCtx->getServiceContext(), NamespaceString(ns)))); invariant(inserted.second); it = std::move(inserted.first); } @@ -254,18 +254,18 @@ void ShardingState::scheduleCleanup(const NamespaceString& nss) { _scheduleWorkFn(nss); } -Status ShardingState::onStaleShardVersion(OperationContext* txn, +Status ShardingState::onStaleShardVersion(OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& expectedVersion) { - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); invariant(enabled()); LOG(2) << "metadata refresh requested for " << nss.ns() << " at shard version " << expectedVersion; // Ensure any ongoing migrations have completed - auto& oss = OperationShardingState::get(txn); - oss.waitForMigrationCriticalSectionSignal(txn); + auto& oss = OperationShardingState::get(opCtx); + oss.waitForMigrationCriticalSectionSignal(opCtx); ChunkVersion collectionShardVersion; @@ -274,9 +274,9 @@ Status ShardingState::onStaleShardVersion(OperationContext* txn, ScopedCollectionMetadata currentMetadata; { - AutoGetCollection autoColl(txn, nss, MODE_IS); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); - currentMetadata = CollectionShardingState::get(txn, nss)->getMetadata(); + currentMetadata = CollectionShardingState::get(opCtx, nss)->getMetadata(); if (currentMetadata) { collectionShardVersion = currentMetadata->getShardVersion(); } @@ -290,23 +290,23 @@ Status ShardingState::onStaleShardVersion(OperationContext* txn, } auto refreshStatusAndVersion = - _refreshMetadata(txn, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr)); + _refreshMetadata(opCtx, nss, (currentMetadata ? currentMetadata.getMetadata() : nullptr)); return refreshStatusAndVersion.getStatus(); } -Status ShardingState::refreshMetadataNow(OperationContext* txn, +Status ShardingState::refreshMetadataNow(OperationContext* opCtx, const NamespaceString& nss, ChunkVersion* latestShardVersion) { ScopedCollectionMetadata currentMetadata; { - AutoGetCollection autoColl(txn, nss, MODE_IS); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); - currentMetadata = CollectionShardingState::get(txn, nss)->getMetadata(); + currentMetadata = CollectionShardingState::get(opCtx, nss)->getMetadata(); } auto refreshLatestShardVersionStatus = - _refreshMetadata(txn, nss, currentMetadata.getMetadata()); + _refreshMetadata(opCtx, nss, currentMetadata.getMetadata()); if (!refreshLatestShardVersionStatus.isOK()) { return refreshLatestShardVersionStatus.getStatus(); } @@ -317,7 +317,7 @@ Status ShardingState::refreshMetadataNow(OperationContext* txn, // NOTE: This method can be called inside a database lock so it should never take any database // locks, perform I/O, or any long running operations. -Status ShardingState::initializeFromShardIdentity(OperationContext* txn, +Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx, const ShardIdentityType& shardIdentity) { invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); @@ -360,7 +360,7 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* txn, ShardedConnectionInfo::addHook(); try { - Status status = _globalInit(txn, configSvrConnStr, generateDistLockProcessId(txn)); + Status status = _globalInit(opCtx, configSvrConnStr, generateDistLockProcessId(opCtx)); if (status.isOK()) { log() << "initialized sharding components"; _setInitializationState(InitializationState::kInitialized); @@ -398,7 +398,7 @@ void ShardingState::_setInitializationState(InitializationState newState) { _initializationState.store(static_cast<uint32_t>(newState)); } -StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* txn) { +StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) { // In sharded readOnly mode, we ignore the shardIdentity document on disk and instead *require* // a shardIdentity document to be passed through --overrideShardIdentity. if (storageGlobalParams.readOnly) { @@ -413,7 +413,7 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon if (!swOverrideShardIdentity.isOK()) { return swOverrideShardIdentity.getStatus(); } - auto status = initializeFromShardIdentity(txn, swOverrideShardIdentity.getValue()); + auto status = initializeFromShardIdentity(opCtx, swOverrideShardIdentity.getValue()); if (!status.isOK()) { return status; } @@ -448,12 +448,12 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon } // Load the shardIdentity document from disk. - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); BSONObj shardIdentityBSON; bool foundShardIdentity = false; try { - AutoGetCollection autoColl(txn, NamespaceString::kConfigCollectionNamespace, MODE_IS); - foundShardIdentity = Helpers::findOne(txn, + AutoGetCollection autoColl(opCtx, NamespaceString::kConfigCollectionNamespace, MODE_IS); + foundShardIdentity = Helpers::findOne(opCtx, autoColl.getCollection(), BSON("_id" << ShardIdentityType::IdName), shardIdentityBSON); @@ -477,7 +477,7 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon if (!swShardIdentity.isOK()) { return swShardIdentity.getStatus(); } - auto status = initializeFromShardIdentity(txn, swShardIdentity.getValue()); + auto status = initializeFromShardIdentity(opCtx, swShardIdentity.getValue()); if (!status.isOK()) { return status; } @@ -496,8 +496,10 @@ StatusWith<bool> ShardingState::initializeShardingAwarenessIfNeeded(OperationCon } StatusWith<ChunkVersion> ShardingState::_refreshMetadata( - OperationContext* txn, const NamespaceString& nss, const CollectionMetadata* metadataForDiff) { - invariant(!txn->lockState()->isLocked()); + OperationContext* opCtx, + const NamespaceString& nss, + const CollectionMetadata* metadataForDiff) { + invariant(!opCtx->lockState()->isLocked()); invariant(enabled()); @@ -533,8 +535,8 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata( << (metadataForDiff ? metadataForDiff->getCollVersion().toString() : "(empty)"); remoteMetadata = stdx::make_unique<CollectionMetadata>(); - status = MetadataLoader::makeCollectionMetadata(txn, - grid.catalogClient(txn), + status = MetadataLoader::makeCollectionMetadata(opCtx, + grid.catalogClient(opCtx), nss.ns(), getShardName(), metadataForDiff, @@ -550,21 +552,21 @@ StatusWith<ChunkVersion> ShardingState::_refreshMetadata( } // Exclusive collection lock needed since we're now changing the metadata - ScopedTransaction transaction(txn, MODE_IX); - AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X); + ScopedTransaction transaction(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(txn, nss); + auto css = CollectionShardingState::get(opCtx, nss); if (!status.isOK()) { invariant(status == ErrorCodes::NamespaceNotFound); - css->refreshMetadata(txn, nullptr); + css->refreshMetadata(opCtx, nullptr); log() << "MetadataLoader took " << t.millis() << " ms and did not find the namespace"; return ChunkVersion::UNSHARDED(); } - css->refreshMetadata(txn, std::move(remoteMetadata)); + css->refreshMetadata(opCtx, std::move(remoteMetadata)); auto metadata = css->getMetadata(); @@ -588,11 +590,11 @@ boost::optional<NamespaceString> ShardingState::getActiveDonateChunkNss() { return _activeMigrationsRegistry.getActiveDonateChunkNss(); } -BSONObj ShardingState::getActiveMigrationStatusReport(OperationContext* txn) { - return _activeMigrationsRegistry.getActiveMigrationStatusReport(txn); +BSONObj ShardingState::getActiveMigrationStatusReport(OperationContext* opCtx) { + return _activeMigrationsRegistry.getActiveMigrationStatusReport(opCtx); } -void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) { +void ShardingState::appendInfo(OperationContext* opCtx, BSONObjBuilder& builder) { const bool isEnabled = enabled(); builder.appendBool("enabled", isEnabled); if (!isEnabled) @@ -620,19 +622,19 @@ void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) { versionB.done(); } -bool ShardingState::needCollectionMetadata(OperationContext* txn, const string& ns) { +bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const string& ns) { if (!enabled()) return false; - Client* client = txn->getClient(); + Client* client = opCtx->getClient(); // Shard version information received from mongos may either by attached to the Client or // directly to the OperationContext. return ShardedConnectionInfo::get(client, false) || - OperationShardingState::get(txn).hasShardVersion(); + OperationShardingState::get(opCtx).hasShardVersion(); } -Status ShardingState::updateShardIdentityConfigString(OperationContext* txn, +Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx, const std::string& newConnectionString) { BSONObj updateObj(ShardIdentityType::createConfigServerUpdateObject(newConnectionString)); @@ -643,9 +645,9 @@ Status ShardingState::updateShardIdentityConfigString(OperationContext* txn, updateReq.setLifecycle(&updateLifecycle); try { - AutoGetOrCreateDb autoDb(txn, NamespaceString::kConfigCollectionNamespace.db(), MODE_X); + AutoGetOrCreateDb autoDb(opCtx, NamespaceString::kConfigCollectionNamespace.db(), MODE_X); - auto result = update(txn, autoDb.getDb(), updateReq); + auto result = update(opCtx, autoDb.getDb(), updateReq); if (result.numMatched == 0) { warning() << "failed to update config string of shard identity document because " << "it does not exist. This shard could have been removed from the cluster"; diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 6686dc8deca..96b650ad803 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -111,7 +111,7 @@ public: */ Status canAcceptShardedCommands() const; - ConnectionString getConfigServer(OperationContext* txn); + ConnectionString getConfigServer(OperationContext* opCtx); std::string getShardName(); @@ -122,21 +122,21 @@ public: /** * Initializes the sharding state of this server from the shard identity document argument. */ - Status initializeFromShardIdentity(OperationContext* txn, + Status initializeFromShardIdentity(OperationContext* opCtx, const ShardIdentityType& shardIdentity); /** * Shuts down sharding machinery on the shard. */ - void shutDown(OperationContext* txn); + void shutDown(OperationContext* opCtx); /** * Updates the ShardRegistry's stored notion of the config server optime based on the * ConfigServerMetadata decoration attached to the OperationContext. */ - Status updateConfigServerOpTimeFromMetadata(OperationContext* txn); + Status updateConfigServerOpTimeFromMetadata(OperationContext* opCtx); - CollectionShardingState* getNS(const std::string& ns, OperationContext* txn); + CollectionShardingState* getNS(const std::string& ns, OperationContext* opCtx); /** * Iterates through all known sharded collections and marks them (in memory only) as not sharded @@ -148,7 +148,7 @@ public: * Refreshes the local metadata based on whether the expected version is higher than what we * have cached. */ - Status onStaleShardVersion(OperationContext* txn, + Status onStaleShardVersion(OperationContext* opCtx, const NamespaceString& nss, const ChunkVersion& expectedVersion); @@ -174,13 +174,13 @@ public: * @return !OK if something else went wrong during reload * @return latestShardVersion the version that is now stored for this collection */ - Status refreshMetadataNow(OperationContext* txn, + Status refreshMetadataNow(OperationContext* opCtx, const NamespaceString& nss, ChunkVersion* latestShardVersion); - void appendInfo(OperationContext* txn, BSONObjBuilder& b); + void appendInfo(OperationContext* opCtx, BSONObjBuilder& b); - bool needCollectionMetadata(OperationContext* txn, const std::string& ns); + bool needCollectionMetadata(OperationContext* opCtx, const std::string& ns); /** * Updates the config server field of the shardIdentity document with the given connection @@ -188,7 +188,7 @@ public: * * Note: this can return NotMaster error. */ - Status updateShardIdentityConfigString(OperationContext* txn, + Status updateShardIdentityConfigString(OperationContext* opCtx, const std::string& newConnectionString); /** @@ -229,7 +229,7 @@ public: * * Takes an IS lock on the namespace of the active migration, if one is active. */ - BSONObj getActiveMigrationStatusReport(OperationContext* txn); + BSONObj getActiveMigrationStatusReport(OperationContext* opCtx); /** * For testing only. Mock the initialization method used by initializeFromConfigConnString and @@ -266,7 +266,7 @@ public: * exception of the duplicate ShardRegistry reload in ShardRegistry::startup() (see * SERVER-26123). Outgoing networking calls to cluster members can now be made. */ - StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* txn); + StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* opCtx); private: // Map from a namespace into the sharding state for each collection we have @@ -307,7 +307,7 @@ private: * The metadataForDiff argument indicates that the specified metadata should be used as a base * from which to only load the differences. If nullptr is passed, a full reload will be done. */ - StatusWith<ChunkVersion> _refreshMetadata(OperationContext* txn, + StatusWith<ChunkVersion> _refreshMetadata(OperationContext* opCtx, const NamespaceString& nss, const CollectionMetadata* metadataForDiff); diff --git a/src/mongo/db/s/sharding_state_command.cpp b/src/mongo/db/s/sharding_state_command.cpp index fe643086dd5..86606ef3598 100644 --- a/src/mongo/db/s/sharding_state_command.cpp +++ b/src/mongo/db/s/sharding_state_command.cpp @@ -67,13 +67,13 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& dbname, BSONObj& cmdObj, int options, std::string& errmsg, BSONObjBuilder& result) override { - ShardingState::get(txn)->appendInfo(txn, result); + ShardingState::get(opCtx)->appendInfo(opCtx, result); return true; } diff --git a/src/mongo/db/s/sharding_state_recovery.cpp b/src/mongo/db/s/sharding_state_recovery.cpp index 7f17b748a90..72ee2b87028 100644 --- a/src/mongo/db/s/sharding_state_recovery.cpp +++ b/src/mongo/db/s/sharding_state_recovery.cpp @@ -182,17 +182,17 @@ private: * it has is to always move the opTime forward for a currently running server. It achieves this by * serializing the modify calls and reading the current opTime under X-lock on the admin database. */ -Status modifyRecoveryDocument(OperationContext* txn, +Status modifyRecoveryDocument(OperationContext* opCtx, RecoveryDocument::ChangeType change, const WriteConcernOptions& writeConcern) { try { // Use boost::optional so we can release the locks early boost::optional<AutoGetOrCreateDb> autoGetOrCreateDb; - autoGetOrCreateDb.emplace(txn, NamespaceString::kConfigCollectionNamespace.db(), MODE_X); + autoGetOrCreateDb.emplace(opCtx, NamespaceString::kConfigCollectionNamespace.db(), MODE_X); BSONObj updateObj = RecoveryDocument::createChangeObj( grid.shardRegistry()->getConfigServerConnectionString(), - ShardingState::get(txn)->getShardName(), + ShardingState::get(opCtx)->getShardName(), grid.configOpTime(), change); @@ -205,7 +205,7 @@ Status modifyRecoveryDocument(OperationContext* txn, UpdateLifecycleImpl updateLifecycle(NamespaceString::kConfigCollectionNamespace); updateReq.setLifecycle(&updateLifecycle); - UpdateResult result = update(txn, autoGetOrCreateDb->getDb(), updateReq); + UpdateResult result = update(opCtx, autoGetOrCreateDb->getDb(), updateReq); invariant(result.numDocsModified == 1 || !result.upserted.isEmpty()); invariant(result.numMatched <= 1); @@ -213,8 +213,8 @@ Status modifyRecoveryDocument(OperationContext* txn, autoGetOrCreateDb = boost::none; WriteConcernResult writeConcernResult; - return waitForWriteConcern(txn, - repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(), + return waitForWriteConcern(opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), writeConcern, &writeConcernResult); } catch (const DBException& ex) { @@ -224,28 +224,29 @@ Status modifyRecoveryDocument(OperationContext* txn, } // namespace -Status ShardingStateRecovery::startMetadataOp(OperationContext* txn) { +Status ShardingStateRecovery::startMetadataOp(OperationContext* opCtx) { Status upsertStatus = - modifyRecoveryDocument(txn, RecoveryDocument::Increment, kMajorityWriteConcern); + modifyRecoveryDocument(opCtx, RecoveryDocument::Increment, kMajorityWriteConcern); if (upsertStatus == ErrorCodes::WriteConcernFailed) { // Couldn't wait for the replication to complete, but the local write was performed. Clear // it up fast (without any waiting for journal or replication) and still treat it as // failure. - modifyRecoveryDocument(txn, RecoveryDocument::Decrement, WriteConcernOptions()); + modifyRecoveryDocument(opCtx, RecoveryDocument::Decrement, WriteConcernOptions()); } return upsertStatus; } -void ShardingStateRecovery::endMetadataOp(OperationContext* txn) { - Status status = modifyRecoveryDocument(txn, RecoveryDocument::Decrement, WriteConcernOptions()); +void ShardingStateRecovery::endMetadataOp(OperationContext* opCtx) { + Status status = + modifyRecoveryDocument(opCtx, RecoveryDocument::Decrement, WriteConcernOptions()); if (!status.isOK()) { warning() << "Failed to decrement minOpTimeUpdaters due to " << redact(status); } } -Status ShardingStateRecovery::recover(OperationContext* txn) { +Status ShardingStateRecovery::recover(OperationContext* opCtx) { if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { return Status::OK(); } @@ -253,9 +254,9 @@ Status ShardingStateRecovery::recover(OperationContext* txn) { BSONObj recoveryDocBSON; try { - AutoGetCollection autoColl(txn, NamespaceString::kConfigCollectionNamespace, MODE_IS); + AutoGetCollection autoColl(opCtx, NamespaceString::kConfigCollectionNamespace, MODE_IS); if (!Helpers::findOne( - txn, autoColl.getCollection(), RecoveryDocument::getQuery(), recoveryDocBSON)) { + opCtx, autoColl.getCollection(), RecoveryDocument::getQuery(), recoveryDocBSON)) { return Status::OK(); } } catch (const DBException& ex) { @@ -270,7 +271,7 @@ Status ShardingStateRecovery::recover(OperationContext* txn) { log() << "Sharding state recovery process found document " << redact(recoveryDoc.toBSON()); - ShardingState* const shardingState = ShardingState::get(txn); + ShardingState* const shardingState = ShardingState::get(opCtx); invariant(shardingState->enabled()); if (!recoveryDoc.getMinOpTimeUpdaters()) { @@ -286,18 +287,18 @@ Status ShardingStateRecovery::recover(OperationContext* txn) { // Need to fetch the latest uptime from the config server, so do a logging write Status status = - grid.catalogClient(txn)->logChange(txn, - "Sharding minOpTime recovery", - NamespaceString::kConfigCollectionNamespace.ns(), - recoveryDocBSON, - ShardingCatalogClient::kMajorityWriteConcern); + grid.catalogClient(opCtx)->logChange(opCtx, + "Sharding minOpTime recovery", + NamespaceString::kConfigCollectionNamespace.ns(), + recoveryDocBSON, + ShardingCatalogClient::kMajorityWriteConcern); if (!status.isOK()) return status; log() << "Sharding state recovered. New config server opTime is " << grid.configOpTime(); // Finally, clear the recovery document so next time we don't need to recover - status = modifyRecoveryDocument(txn, RecoveryDocument::Clear, kLocalWriteConcern); + status = modifyRecoveryDocument(opCtx, RecoveryDocument::Clear, kLocalWriteConcern); if (!status.isOK()) { warning() << "Failed to reset sharding state recovery document due to " << redact(status); } diff --git a/src/mongo/db/s/sharding_state_recovery.h b/src/mongo/db/s/sharding_state_recovery.h index c1b31e351fc..2960be472ba 100644 --- a/src/mongo/db/s/sharding_state_recovery.h +++ b/src/mongo/db/s/sharding_state_recovery.h @@ -53,13 +53,13 @@ public: * server's minOpTime after node failure. It is only safe to commence the operation after this * method returns an OK status. */ - static Status startMetadataOp(OperationContext* txn); + static Status startMetadataOp(OperationContext* opCtx); /** * Marks the end of a sharding metadata operation, persisting the latest config server opTime at * the time of the call. */ - static void endMetadataOp(OperationContext* txn); + static void endMetadataOp(OperationContext* opCtx); /** * Recovers the minimal config server opTime that the instance should be using for reading @@ -71,7 +71,7 @@ public: * Returns OK if the minOpTime was successfully recovered or failure status otherwise. It is * unsafe to read and rely on any sharding metadata before this method has returned success. */ - static Status recover(OperationContext* txn); + static Status recover(OperationContext* opCtx); }; } // namespace mongo diff --git a/src/mongo/db/s/sharding_state_test.cpp b/src/mongo/db/s/sharding_state_test.cpp index dce1326b0b9..aa63085b60a 100644 --- a/src/mongo/db/s/sharding_state_test.cpp +++ b/src/mongo/db/s/sharding_state_test.cpp @@ -100,7 +100,7 @@ protected: // When sharding initialization is triggered, initialize sharding state as a shard server. serverGlobalParams.clusterRole = ClusterRole::ShardServer; - _shardingState.setGlobalInitMethodForTest([&](OperationContext* txn, + _shardingState.setGlobalInitMethodForTest([&](OperationContext* opCtx, const ConnectionString& configConnStr, StringData distLockProcessId) { auto status = initializeGlobalShardingStateForMongodForTest(configConnStr); @@ -170,7 +170,7 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { shardIdentity.setClusterId(OID::gen()); shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { return Status{ErrorCodes::ShutdownInProgress, "shutting down"}; }); @@ -183,7 +183,7 @@ TEST_F(ShardingStateTest, InitWhilePreviouslyInErrorStateWillStayInErrorState) { // ShardingState is now in error state, attempting to call it again will still result in error. shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { return Status::OK(); }); @@ -213,7 +213,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithMatchingShardIdentitySucceeds) { shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { return Status{ErrorCodes::InternalError, "should not reach here"}; }); @@ -241,7 +241,7 @@ TEST_F(ShardingStateTest, InitializeAgainWithSameReplSetNameSucceeds) { shardIdentity2.setClusterId(clusterID); shardingState()->setGlobalInitMethodForTest( - [](OperationContext* txn, const ConnectionString& connStr, StringData distLockProcessId) { + [](OperationContext* opCtx, const ConnectionString& connStr, StringData distLockProcessId) { return Status{ErrorCodes::InternalError, "should not reach here"}; }); diff --git a/src/mongo/db/s/split_chunk_command.cpp b/src/mongo/db/s/split_chunk_command.cpp index 09d7f147b27..703143cfd67 100644 --- a/src/mongo/db/s/split_chunk_command.cpp +++ b/src/mongo/db/s/split_chunk_command.cpp @@ -65,7 +65,7 @@ namespace { const ReadPreferenceSetting kPrimaryOnlyReadPreference{ReadPreference::PrimaryOnly}; -bool checkIfSingleDoc(OperationContext* txn, +bool checkIfSingleDoc(OperationContext* opCtx, Collection* collection, const IndexDescriptor* idx, const ChunkType* chunk) { @@ -73,7 +73,7 @@ bool checkIfSingleDoc(OperationContext* txn, BSONObj newmin = Helpers::toKeyFormat(kp.extendRangeBound(chunk->getMin(), false)); BSONObj newmax = Helpers::toKeyFormat(kp.extendRangeBound(chunk->getMax(), true)); - unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn, + unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(opCtx, collection, idx, newmin, @@ -100,16 +100,16 @@ bool checkIfSingleDoc(OperationContext* txn, // using the specified splitPoints. Returns false if the metadata's chunks don't match // the new chunk boundaries exactly. // -bool _checkMetadataForSuccess(OperationContext* txn, +bool _checkMetadataForSuccess(OperationContext* opCtx, const NamespaceString& nss, const ChunkRange& chunkRange, const std::vector<BSONObj>& splitKeys) { ScopedCollectionMetadata metadataAfterSplit; { - AutoGetCollection autoColl(txn, nss, MODE_IS); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); // Get collection metadata - metadataAfterSplit = CollectionShardingState::get(txn, nss.ns())->getMetadata(); + metadataAfterSplit = CollectionShardingState::get(opCtx, nss.ns())->getMetadata(); } auto newChunkBounds(splitKeys); @@ -167,13 +167,13 @@ public: return parseNsFullyQualified(dbname, cmdObj); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& dbname, BSONObj& cmdObj, int options, std::string& errmsg, BSONObjBuilder& result) override { - auto shardingState = ShardingState::get(txn); + auto shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); // @@ -233,8 +233,8 @@ public: const string whyMessage(str::stream() << "splitting chunk [" << min << ", " << max << ") in " << nss.toString()); - auto scopedDistLock = grid.catalogClient(txn)->getDistLockManager()->lock( - txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); + auto scopedDistLock = grid.catalogClient(opCtx)->getDistLockManager()->lock( + opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); if (!scopedDistLock.isOK()) { errmsg = str::stream() << "could not acquire collection lock for " << nss.toString() << " to split chunk [" << redact(min) << "," << redact(max) @@ -245,7 +245,7 @@ public: // Always check our version remotely ChunkVersion shardVersion; - Status refreshStatus = shardingState->refreshMetadataNow(txn, nss, &shardVersion); + Status refreshStatus = shardingState->refreshMetadataNow(opCtx, nss, &shardVersion); if (!refreshStatus.isOK()) { errmsg = str::stream() << "splitChunk cannot split chunk " @@ -266,7 +266,7 @@ public: return false; } - const auto& oss = OperationShardingState::get(txn); + const auto& oss = OperationShardingState::get(opCtx); uassert(ErrorCodes::InvalidOptions, "collection version is missing", oss.hasShardVersion()); // Even though the splitChunk command transmits a value in the operation's shardVersion @@ -286,10 +286,10 @@ public: ScopedCollectionMetadata collMetadata; { - AutoGetCollection autoColl(txn, nss, MODE_IS); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); // Get collection metadata - collMetadata = CollectionShardingState::get(txn, nss.ns())->getMetadata(); + collMetadata = CollectionShardingState::get(opCtx, nss.ns())->getMetadata(); } // With nonzero shard version, we must have metadata @@ -313,8 +313,8 @@ public: request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); auto cmdResponseStatus = - Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( - txn, + Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, kPrimaryOnlyReadPreference, "admin", configCmdObj, @@ -325,7 +325,7 @@ public: // { ChunkVersion unusedShardVersion; - refreshStatus = shardingState->refreshMetadataNow(txn, nss, &unusedShardVersion); + refreshStatus = shardingState->refreshMetadataNow(opCtx, nss, &unusedShardVersion); if (!refreshStatus.isOK()) { errmsg = str::stream() << "failed to refresh metadata for split chunk [" @@ -368,7 +368,7 @@ public: // succeeds, thus the automatic retry fails with a precondition violation, for example. // if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) && - _checkMetadataForSuccess(txn, nss, chunkRange, splitKeys)) { + _checkMetadataForSuccess(opCtx, nss, chunkRange, splitKeys)) { LOG(1) << "splitChunk [" << redact(min) << "," << redact(max) << ") has already been committed."; @@ -381,7 +381,7 @@ public: // Select chunk to move out for "top chunk optimization". KeyPattern shardKeyPattern(collMetadata->getKeyPattern()); - AutoGetCollection autoColl(txn, nss, MODE_IS); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); Collection* const collection = autoColl.getCollection(); if (!collection) { @@ -393,7 +393,7 @@ public: // Allow multiKey based on the invariant that shard keys must be single-valued. Therefore, // any multi-key index prefixed by shard key cannot be multikey over the shard key fields. IndexDescriptor* idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, keyPatternObj, false); + collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPatternObj, false); if (!idx) { return true; } @@ -407,11 +407,11 @@ public: frontChunk.setMax(splitKeys.front()); if (shardKeyPattern.globalMax().woCompare(backChunk.getMax()) == 0 && - checkIfSingleDoc(txn, collection, idx, &backChunk)) { + checkIfSingleDoc(opCtx, collection, idx, &backChunk)) { result.append("shouldMigrate", BSON("min" << backChunk.getMin() << "max" << backChunk.getMax())); } else if (shardKeyPattern.globalMin().woCompare(frontChunk.getMin()) == 0 && - checkIfSingleDoc(txn, collection, idx, &frontChunk)) { + checkIfSingleDoc(opCtx, collection, idx, &frontChunk)) { result.append("shouldMigrate", BSON("min" << frontChunk.getMin() << "max" << frontChunk.getMax())); } diff --git a/src/mongo/db/s/split_vector_command.cpp b/src/mongo/db/s/split_vector_command.cpp index f02a7d68fa3..73a424e5d27 100644 --- a/src/mongo/db/s/split_vector_command.cpp +++ b/src/mongo/db/s/split_vector_command.cpp @@ -112,7 +112,7 @@ public: return parseNsFullyQualified(dbname, cmdObj); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const string& dbname, BSONObj& jsobj, int options, @@ -157,7 +157,7 @@ public: { // Get the size estimate for this namespace - AutoGetCollection autoColl(txn, nss, MODE_IS); + AutoGetCollection autoColl(opCtx, nss, MODE_IS); Collection* const collection = autoColl.getCollection(); if (!collection) { @@ -169,7 +169,7 @@ public: // Therefore, any multi-key index prefixed by shard key cannot be multikey over // the shard key fields. IndexDescriptor* idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, keyPattern, false); + collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPattern, false); if (idx == NULL) { errmsg = (string) "couldn't find index over splitting key " + keyPattern.clientReadable().toString(); @@ -186,8 +186,8 @@ public: max = Helpers::toKeyFormat(kp.extendRangeBound(max, false)); } - const long long recCount = collection->numRecords(txn); - const long long dataSize = collection->dataSize(txn); + const long long recCount = collection->numRecords(opCtx); + const long long dataSize = collection->dataSize(opCtx); // // 1.b Now that we have the size estimate, go over the remaining parameters and apply @@ -260,7 +260,7 @@ public: long long numChunks = 0; unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(txn, + InternalPlanner::indexScan(opCtx, collection, idx, min, @@ -336,7 +336,7 @@ public: log() << "splitVector doing another cycle because of force, keyCount now: " << keyCount; - exec = InternalPlanner::indexScan(txn, + exec = InternalPlanner::indexScan(opCtx, collection, idx, min, diff --git a/src/mongo/db/s/unset_sharding_command.cpp b/src/mongo/db/s/unset_sharding_command.cpp index 9aa63819135..7155b35bce8 100644 --- a/src/mongo/db/s/unset_sharding_command.cpp +++ b/src/mongo/db/s/unset_sharding_command.cpp @@ -72,13 +72,13 @@ public: out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); } - bool run(OperationContext* txn, + bool run(OperationContext* opCtx, const std::string& dbname, BSONObj& cmdObj, int options, std::string& errmsg, BSONObjBuilder& result) override { - ShardedConnectionInfo::reset(txn->getClient()); + ShardedConnectionInfo::reset(opCtx->getClient()); return true; } |