summaryrefslogtreecommitdiff
path: root/src/mongo/s/client/parallel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/client/parallel.cpp')
-rw-r--r--src/mongo/s/client/parallel.cpp52
1 files changed, 27 insertions, 25 deletions
diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp
index 1b16dee2032..89d2836dc92 100644
--- a/src/mongo/s/client/parallel.cpp
+++ b/src/mongo/s/client/parallel.cpp
@@ -211,13 +211,13 @@ ParallelSortClusteredCursor::~ParallelSortClusteredCursor() {
_done = true;
}
-void ParallelSortClusteredCursor::init(OperationContext* txn) {
+void ParallelSortClusteredCursor::init(OperationContext* opCtx) {
if (_didInit)
return;
_didInit = true;
if (!_qSpec.isEmpty()) {
- fullInit(txn);
+ fullInit(opCtx);
} else {
// You can only get here by using the legacy constructor
// TODO: Eliminate this
@@ -316,17 +316,17 @@ void ParallelSortClusteredCursor::_finishCons() {
17306, "have to have all text meta sort keys in projection", textMetaSortKeyFields.empty());
}
-void ParallelSortClusteredCursor::fullInit(OperationContext* txn) {
- startInit(txn);
- finishInit(txn);
+void ParallelSortClusteredCursor::fullInit(OperationContext* opCtx) {
+ startInit(opCtx);
+ finishInit(opCtx);
}
-void ParallelSortClusteredCursor::_markStaleNS(OperationContext* txn,
+void ParallelSortClusteredCursor::_markStaleNS(OperationContext* opCtx,
const NamespaceString& staleNS,
const StaleConfigException& e,
bool& forceReload) {
if (e.requiresFullReload()) {
- Grid::get(txn)->catalogCache()->invalidate(staleNS.db());
+ Grid::get(opCtx)->catalogCache()->invalidate(staleNS.db());
}
if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end())
@@ -344,10 +344,10 @@ void ParallelSortClusteredCursor::_markStaleNS(OperationContext* txn,
forceReload = tries > 2;
}
-void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* txn,
+void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* opCtx,
const NamespaceString& staleNS,
bool forceReload) {
- auto scopedCMStatus = ScopedChunkManager::get(txn, staleNS);
+ auto scopedCMStatus = ScopedChunkManager::get(opCtx, staleNS);
if (!scopedCMStatus.isOK()) {
log() << "cannot reload database info for stale namespace " << staleNS.ns();
return;
@@ -356,11 +356,11 @@ void ParallelSortClusteredCursor::_handleStaleNS(OperationContext* txn,
const auto& scopedCM = scopedCMStatus.getValue();
// Reload chunk manager, potentially forcing the namespace
- scopedCM.db()->getChunkManagerIfExists(txn, staleNS.ns(), true, forceReload);
+ scopedCM.db()->getChunkManagerIfExists(opCtx, staleNS.ns(), true, forceReload);
}
void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
- OperationContext* txn,
+ OperationContext* opCtx,
std::shared_ptr<ParallelConnectionState> state,
const ShardId& shardId,
std::shared_ptr<Shard> primary,
@@ -377,7 +377,8 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
// Setup conn
if (!state->conn) {
- const auto shard = uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, shardId));
+ const auto shard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
state->conn.reset(new ShardConnection(shard->getConnString(), ns.ns(), manager));
}
@@ -440,7 +441,7 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
}
}
-void ParallelSortClusteredCursor::startInit(OperationContext* txn) {
+void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
const bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
const NamespaceString nss(!_cInfo.isEmpty() ? _cInfo.versionedNS : _qSpec.ns());
@@ -458,7 +459,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) {
shared_ptr<Shard> primary;
{
- auto scopedCMStatus = ScopedChunkManager::get(txn, nss);
+ auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss);
if (scopedCMStatus != ErrorCodes::NamespaceNotFound) {
uassertStatusOK(scopedCMStatus.getStatus());
const auto& scopedCM = scopedCMStatus.getValue();
@@ -476,7 +477,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) {
<< manager->getVersion().toString() << "]";
}
- manager->getShardIdsForQuery(txn,
+ manager->getShardIdsForQuery(opCtx,
!_cInfo.isEmpty() ? _cInfo.cmdFilter : _qSpec.filter(),
!_cInfo.isEmpty() ? _cInfo.cmdCollation : BSONObj(),
&shardIds);
@@ -551,7 +552,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) {
mdata.pcState = std::make_shared<ParallelConnectionState>();
auto state = mdata.pcState;
- setupVersionAndHandleSlaveOk(txn, state, shardId, primary, nss, vinfo, manager);
+ setupVersionAndHandleSlaveOk(opCtx, state, shardId, primary, nss, vinfo, manager);
const string& ns = _qSpec.ns();
@@ -643,7 +644,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) {
// Probably need to retry fully
bool forceReload;
- _markStaleNS(txn, staleNS, e, forceReload);
+ _markStaleNS(opCtx, staleNS, e, forceReload);
LOG(1) << "stale config of ns " << staleNS
<< " during initialization, will retry with forced : " << forceReload
@@ -654,10 +655,10 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) {
warning() << "versioned ns " << nss.ns() << " doesn't match stale config namespace "
<< staleNS;
- _handleStaleNS(txn, staleNS, forceReload);
+ _handleStaleNS(opCtx, staleNS, forceReload);
// Restart with new chunk manager
- startInit(txn);
+ startInit(opCtx);
return;
} catch (SocketException& e) {
warning() << "socket exception when initializing on " << shardId
@@ -727,7 +728,7 @@ void ParallelSortClusteredCursor::startInit(OperationContext* txn) {
}
}
-void ParallelSortClusteredCursor::finishInit(OperationContext* txn) {
+void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
bool specialVersion = _cInfo.versionedNS.size() > 0;
string ns = specialVersion ? _cInfo.versionedNS : _qSpec.ns();
@@ -867,7 +868,7 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* txn) {
const StaleConfigException& exception = i->second;
bool forceReload;
- _markStaleNS(txn, staleNS, exception, forceReload);
+ _markStaleNS(opCtx, staleNS, exception, forceReload);
LOG(1) << "stale config of ns " << staleNS
<< " on finishing query, will retry with forced : " << forceReload
@@ -878,13 +879,13 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* txn) {
warning() << "versioned ns " << ns << " doesn't match stale config namespace "
<< staleNS;
- _handleStaleNS(txn, staleNS, forceReload);
+ _handleStaleNS(opCtx, staleNS, forceReload);
}
}
// Re-establish connections we need to
- startInit(txn);
- finishInit(txn);
+ startInit(opCtx);
+ finishInit(opCtx);
return;
}
@@ -924,7 +925,8 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* txn) {
_cursors[index].reset(mdata.pcState->cursor.get(), &mdata);
- const auto shard = uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, shardId));
+ const auto shard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
_servers.insert(shard->getConnString().toString());
index++;