summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands/cluster_write.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands/cluster_write.cpp')
-rw-r--r--src/mongo/s/commands/cluster_write.cpp53
1 files changed, 28 insertions, 25 deletions
diff --git a/src/mongo/s/commands/cluster_write.cpp b/src/mongo/s/commands/cluster_write.cpp
index 3468ca6dcf4..730d5e8a178 100644
--- a/src/mongo/s/commands/cluster_write.cpp
+++ b/src/mongo/s/commands/cluster_write.cpp
@@ -66,9 +66,9 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
dassert(response->isValid(NULL));
}
-void reloadChunkManager(OperationContext* txn, const NamespaceString& nss) {
- auto config = uassertStatusOK(ScopedShardDatabase::getExisting(txn, nss.db()));
- config.db()->getChunkManagerIfExists(txn, nss.ns(), true);
+void reloadChunkManager(OperationContext* opCtx, const NamespaceString& nss) {
+ auto config = uassertStatusOK(ScopedShardDatabase::getExisting(opCtx, nss.db()));
+ config.db()->getChunkManagerIfExists(opCtx, nss.ns(), true);
}
/**
@@ -104,7 +104,7 @@ uint64_t calculateDesiredChunkSize(uint64_t maxChunkSizeBytes, uint64_t numChunk
* ordered list of ascending/descending field names. For example {a : 1, b : -1} is not special, but
* {a : "hashed"} is.
*/
-BSONObj findExtremeKeyForShard(OperationContext* txn,
+BSONObj findExtremeKeyForShard(OperationContext* opCtx,
const NamespaceString& nss,
const ShardId& shardId,
const ShardKeyPattern& shardKeyPattern,
@@ -130,7 +130,8 @@ BSONObj findExtremeKeyForShard(OperationContext* txn,
// Find the extreme key
const auto shardConnStr = [&]() {
- const auto shard = uassertStatusOK(Grid::get(txn)->shardRegistry()->getShard(txn, shardId));
+ const auto shard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
return shard->getConnString();
}();
@@ -172,8 +173,10 @@ BSONObj findExtremeKeyForShard(OperationContext* txn,
/**
* Splits the chunks touched based from the targeter stats if needed.
*/
-void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const TargeterStats& stats) {
- auto scopedCMStatus = ScopedChunkManager::get(txn, nss);
+void splitIfNeeded(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const TargeterStats& stats) {
+ auto scopedCMStatus = ScopedChunkManager::get(opCtx, nss);
if (!scopedCMStatus.isOK()) {
warning() << "failed to get collection information for " << nss
<< " while checking for auto-split" << causedBy(scopedCMStatus.getStatus());
@@ -196,7 +199,7 @@ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const Targ
return;
}
- updateChunkWriteStatsAndSplitIfNeeded(txn, scopedCM.cm().get(), chunk.get(), it->second);
+ updateChunkWriteStatsAndSplitIfNeeded(opCtx, scopedCM.cm().get(), chunk.get(), it->second);
}
}
@@ -205,7 +208,7 @@ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const Targ
ClusterWriter::ClusterWriter(bool autoSplit, int timeoutMillis)
: _autoSplit(autoSplit), _timeoutMillis(timeoutMillis) {}
-void ClusterWriter::write(OperationContext* txn,
+void ClusterWriter::write(OperationContext* opCtx,
const BatchedCommandRequest& origRequest,
BatchedCommandResponse* response) {
// Add _ids to insert request if req'd
@@ -291,14 +294,14 @@ void ClusterWriter::write(OperationContext* txn,
request = requestWithWriteConcern.get();
}
- Grid::get(txn)->catalogClient(txn)->writeConfigServerDirect(txn, *request, response);
+ Grid::get(opCtx)->catalogClient(opCtx)->writeConfigServerDirect(opCtx, *request, response);
} else {
TargeterStats targeterStats;
{
ChunkManagerTargeter targeter(request->getTargetingNSS(), &targeterStats);
- Status targetInitStatus = targeter.init(txn);
+ Status targetInitStatus = targeter.init(opCtx);
if (!targetInitStatus.isOK()) {
toBatchError(Status(targetInitStatus.code(),
str::stream()
@@ -313,11 +316,11 @@ void ClusterWriter::write(OperationContext* txn,
DBClientMultiCommand dispatcher;
BatchWriteExec exec(&targeter, &dispatcher);
- exec.executeBatch(txn, *request, response, &_stats);
+ exec.executeBatch(opCtx, *request, response, &_stats);
}
if (_autoSplit) {
- splitIfNeeded(txn, request->getNS(), targeterStats);
+ splitIfNeeded(opCtx, request->getNS(), targeterStats);
}
}
}
@@ -326,7 +329,7 @@ const BatchWriteExecStats& ClusterWriter::getStats() {
return _stats;
}
-void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
+void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* opCtx,
ChunkManager* manager,
Chunk* chunk,
long dataWritten) {
@@ -334,7 +337,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
// bubbled up on the client connection doing a write.
LastError::Disabled d(&LastError::get(cc()));
- const auto balancerConfig = Grid::get(txn)->getBalancerConfiguration();
+ const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
const bool minIsInf =
(0 == manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(chunk->getMin()));
@@ -370,7 +373,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
try {
// Ensure we have the most up-to-date balancer configuration
- uassertStatusOK(balancerConfig->refreshAndCheck(txn));
+ uassertStatusOK(balancerConfig->refreshAndCheck(opCtx));
if (!balancerConfig->getShouldAutoSplit()) {
return;
@@ -393,7 +396,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
}();
auto splitPoints =
- uassertStatusOK(shardutil::selectChunkSplitPoints(txn,
+ uassertStatusOK(shardutil::selectChunkSplitPoints(opCtx,
chunk->getShardId(),
nss,
manager->getShardKeyPattern(),
@@ -425,13 +428,13 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
if (KeyPattern::isOrderedKeyPattern(manager->getShardKeyPattern().toBSON())) {
if (minIsInf) {
BSONObj key = findExtremeKeyForShard(
- txn, nss, chunk->getShardId(), manager->getShardKeyPattern(), true);
+ opCtx, nss, chunk->getShardId(), manager->getShardKeyPattern(), true);
if (!key.isEmpty()) {
splitPoints.front() = key.getOwned();
}
} else if (maxIsInf) {
BSONObj key = findExtremeKeyForShard(
- txn, nss, chunk->getShardId(), manager->getShardKeyPattern(), false);
+ opCtx, nss, chunk->getShardId(), manager->getShardKeyPattern(), false);
if (!key.isEmpty()) {
splitPoints.back() = key.getOwned();
}
@@ -439,7 +442,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
}
const auto suggestedMigrateChunk =
- uassertStatusOK(shardutil::splitChunkAtMultiplePoints(txn,
+ uassertStatusOK(shardutil::splitChunkAtMultiplePoints(opCtx,
chunk->getShardId(),
nss,
manager->getShardKeyPattern(),
@@ -454,7 +457,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
return false;
auto collStatus =
- Grid::get(txn)->catalogClient(txn)->getCollection(txn, manager->getns());
+ Grid::get(opCtx)->catalogClient(opCtx)->getCollection(opCtx, manager->getns());
if (!collStatus.isOK()) {
log() << "Auto-split for " << nss << " failed to load collection metadata"
<< causedBy(redact(collStatus.getStatus()));
@@ -470,7 +473,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
(shouldBalance ? ")" : ", but no migrations allowed)"));
if (!shouldBalance || !suggestedMigrateChunk) {
- reloadChunkManager(txn, nss);
+ reloadChunkManager(opCtx, nss);
return;
}
@@ -482,7 +485,7 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
// We need to use the latest chunk manager (after the split) in order to have the most
// up-to-date view of the chunk we are about to move
- auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(txn, nss));
+ auto scopedCM = uassertStatusOK(ScopedChunkManager::refreshAndGet(opCtx, nss));
auto suggestedChunk = scopedCM.cm()->findIntersectingChunkWithSimpleCollation(
suggestedMigrateChunk->getMin());
@@ -493,9 +496,9 @@ void updateChunkWriteStatsAndSplitIfNeeded(OperationContext* txn,
chunkToMove.setMax(suggestedChunk->getMax());
chunkToMove.setVersion(suggestedChunk->getLastmod());
- uassertStatusOK(configsvr_client::rebalanceChunk(txn, chunkToMove));
+ uassertStatusOK(configsvr_client::rebalanceChunk(opCtx, chunkToMove));
- reloadChunkManager(txn, nss);
+ reloadChunkManager(opCtx, nss);
} catch (const DBException& ex) {
chunk->randomizeBytesWritten();