diff options
Diffstat (limited to 'src/mongo/s/client/parallel.cpp')
-rw-r--r-- | src/mongo/s/client/parallel.cpp | 52 |
1 files changed, 27 insertions, 25 deletions
diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp index 1b16dee2032..89d2836dc92 100644 --- a/src/mongo/s/client/parallel.cpp +++ b/src/mongo/s/client/parallel.cpp @@ -211,13 +211,13 @@ ParallelSortClusteredCursor::~ParallelSortClusteredCursor() { _done = true; } -void ParallelSortClusteredCursor::init(OperationContext* txn) { +void ParallelSortClusteredCursor::init(OperationContext* opCtx) { if (_didInit) return; _didInit = true; if (!_qSpec.isEmpty()) { - fullInit(txn); + fullInit(opCtx); } else { // You can only get here by using the legacy constructor // TODO: Eliminate this @@ -316,17 +316,17 @@ void ParallelSortClusteredCursor::_finishCons() { 17306, "have to have all text meta sort keys in projection", textMetaSortKeyFields.empty()); } -void ParallelSortClusteredCursor::fullInit(OperationContext* txn) { - startInit(txn); - finishInit(txn); +void ParallelSortClusteredCursor::fullInit(OperationContext* opCtx) { + startInit(opCtx); + finishInit(opCtx); } -void ParallelSortClusteredCursor::_markStaleNS(OperationContext* txn, +void ParallelSortClusteredCursor::_markStaleNS(OperationContext* opCtx, const NamespaceString& staleNS, const StaleConfigException& e, bool& forceReload) { if (e.requiresFullReload()) { - Grid::get(txn)->catalogCache()->invalidate(staleNS.db()); + Grid::get(opCtx)->catalogCache()->invalidate(staleNS.db()); } if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end()) @@ -344,10 +344,10 @@ void ParallelSortClusteredCursor::_markStaleNS(OperationContext* txn, forceReload = tries > 2; } -void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* txn, +void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* opCtx, const NamespaceString& staleNS, bool forceReload) { - auto scopedCMStatus = ScopedChunkManager::get(txn, staleNS); + auto scopedCMStatus = ScopedChunkManager::get(opCtx, staleNS); if (!scopedCMStatus.isOK()) { log() << "cannot reload database info for stale namespace " << staleNS.ns(); return; @@ -356,11 +356,11 @@ void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* txn, const auto& scopedCM = scopedCMStatus.getValue(); // Reload chunk manager, potentially forcing the namespace - scopedCM.db()->getChunkManagerIfExists(txn, staleNS.ns(), true, forceReload); + scopedCM.db()->getChunkManagerIfExists(opCtx, staleNS.ns(), true, forceReload); } void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk( - OperationContext* txn, + OperationContext* opCtx, std::shared_ptr<ParallelConnectionState> state, const ShardId& shardId, std::shared_ptr<Shard> primary, @@ -377,7 +377,8 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk( // Setup conn if (!state->conn) { - const auto shard = uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, shardId)); + const auto shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); state->conn.reset(new ShardConnection(shard->getConnString(), ns.ns(), manager)); } @@ -440,7 +441,7 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk( } } -void ParallelSortClusteredCursor::startInit(OperationContext* txn) { +void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) { const bool returnPartial = (_qSpec.options() & QueryOption_PartialResults); const NamespaceString nss(!_cInfo.isEmpty() ? _cInfo.versionedNS : _qSpec.ns()); @@ -458,7 +459,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) { shared_ptr<Shard> primary; { - auto scopedCMStatus = ScopedChunkManager::get(txn, nss); + auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss); if (scopedCMStatus != ErrorCodes::NamespaceNotFound) { uassertStatusOK(scopedCMStatus.getStatus()); const auto& scopedCM = scopedCMStatus.getValue(); @@ -476,7 +477,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) { << manager->getVersion().toString() << "]"; } - manager->getShardIdsForQuery(txn, + manager->getShardIdsForQuery(opCtx, !_cInfo.isEmpty() ? _cInfo.cmdFilter : _qSpec.filter(), !_cInfo.isEmpty() ? _cInfo.cmdCollation : BSONObj(), &shardIds); @@ -551,7 +552,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) { mdata.pcState = std::make_shared<ParallelConnectionState>(); auto state = mdata.pcState; - setupVersionAndHandleSlaveOk(txn, state, shardId, primary, nss, vinfo, manager); + setupVersionAndHandleSlaveOk(opCtx, state, shardId, primary, nss, vinfo, manager); const string& ns = _qSpec.ns(); @@ -643,7 +644,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) { // Probably need to retry fully bool forceReload; - _markStaleNS(txn, staleNS, e, forceReload); + _markStaleNS(opCtx, staleNS, e, forceReload); LOG(1) << "stale config of ns " << staleNS << " during initialization, will retry with forced : " << forceReload @@ -654,10 +655,10 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) { warning() << "versioned ns " << nss.ns() << " doesn't match stale config namespace " << staleNS; - _handleStaleNS(txn, staleNS, forceReload); + _handleStaleNS(opCtx, staleNS, forceReload); // Restart with new chunk manager - startInit(txn); + startInit(opCtx); return; } catch (SocketException& e) { warning() << "socket exception when initializing on " << shardId @@ -727,7 +728,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) { } } -void ParallelSortClusteredCursor::finishInit(OperationContext* txn) { +void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) { bool returnPartial = (_qSpec.options() & QueryOption_PartialResults); bool specialVersion = _cInfo.versionedNS.size() > 0; string ns = specialVersion ? _cInfo.versionedNS : _qSpec.ns(); @@ -867,7 +868,7 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* txn) { const StaleConfigException& exception = i->second; bool forceReload; - _markStaleNS(txn, staleNS, exception, forceReload); + _markStaleNS(opCtx, staleNS, exception, forceReload); LOG(1) << "stale config of ns " << staleNS << " on finishing query, will retry with forced : " << forceReload @@ -878,13 +879,13 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* txn) { warning() << "versioned ns " << ns << " doesn't match stale config namespace " << staleNS; - _handleStaleNS(txn, staleNS, forceReload); + _handleStaleNS(opCtx, staleNS, forceReload); } } // Re-establish connections we need to - startInit(txn); - finishInit(txn); + startInit(opCtx); + finishInit(opCtx); return; } @@ -924,7 +925,8 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* txn) { _cursors[index].reset(mdata.pcState->cursor.get(), &mdata); - const auto shard = uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, shardId)); + const auto shard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); _servers.insert(shard->getConnString().toString()); index++; |