summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/merge_chunks_command.cpp
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2016-08-30 18:13:32 -0400
committerSpencer T Brody <spencer@mongodb.com>2016-09-06 12:44:46 -0400
commitf8872dc852e32ff1cdfcead7b530bc5c350edfcf (patch)
tree9e30188a1a43c04178360ffd979651692d2fe7dc /src/mongo/db/s/merge_chunks_command.cpp
parente0a2e534e4c917d6abe7f6d00c1eaf75fa9f2358 (diff)
downloadmongo-f8872dc852e32ff1cdfcead7b530bc5c350edfcf.tar.gz
SERVER-25002 Wire in new _configsvrMergeChunks command so shards no longer run applyOps directly
Diffstat (limited to 'src/mongo/db/s/merge_chunks_command.cpp')
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp343
1 files changed, 152 insertions, 191 deletions
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index 9619e40bdb1..bfc24d2dfc7 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -42,7 +42,9 @@
#include "mongo/db/s/metadata_manager.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/s/request_types/merge_chunk_request_type.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -55,152 +57,75 @@ using std::vector;
namespace {
-BSONArray buildOpPrecond(const string& ns,
- const ShardId& shardName,
- const ChunkVersion& shardVersion) {
- BSONArrayBuilder preCond;
- BSONObjBuilder condB;
- condB.append("ns", ChunkType::ConfigNS);
- condB.append("q",
- BSON("query" << BSON(ChunkType::ns(ns)) << "orderby"
- << BSON(ChunkType::DEPRECATED_lastmod() << -1)));
+bool _checkMetadataForSuccess(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey,
+ const BSONObj& maxKey) {
+ ScopedCollectionMetadata metadataAfterMerge;
{
- BSONObjBuilder resB(condB.subobjStart("res"));
- shardVersion.addToBSON(resB, ChunkType::DEPRECATED_lastmod());
- resB.done();
- }
- preCond.append(condB.obj());
- return preCond.arr();
-}
-
-BSONObj buildOpMergeChunk(const ChunkType& mergedChunk) {
- BSONObjBuilder opB;
-
- // Op basics
- opB.append("op", "u");
- opB.appendBool("b", false); // no upserting
- opB.append("ns", ChunkType::ConfigNS);
-
- // New object
- opB.append("o", mergedChunk.toBSON());
-
- // Query object
- opB.append("o2", BSON(ChunkType::name(mergedChunk.getName())));
-
- return opB.obj();
-}
-
-BSONObj buildOpRemoveChunk(const ChunkType& chunkToRemove) {
- BSONObjBuilder opB;
-
- // Op basics
- opB.append("op", "d"); // delete
- opB.append("ns", ChunkType::ConfigNS);
-
- opB.append("o", BSON(ChunkType::name(chunkToRemove.getName())));
-
- return opB.obj();
-}
-
-BSONObj buildMergeLogEntry(const std::vector<ChunkType>& chunksToMerge,
- const ChunkVersion& currShardVersion,
- const ChunkVersion& newMergedVersion) {
- BSONObjBuilder logDetailB;
-
- BSONArrayBuilder mergedB(logDetailB.subarrayStart("merged"));
+ AutoGetCollection autoColl(txn, nss, MODE_IS);
- for (const ChunkType& chunkToMerge : chunksToMerge) {
- mergedB.append(chunkToMerge.toBSON());
+ // Get collection metadata
+ metadataAfterMerge = CollectionShardingState::get(txn, nss.ns())->getMetadata();
}
- mergedB.done();
-
- currShardVersion.addToBSON(logDetailB, "prevShardVersion");
- newMergedVersion.addToBSON(logDetailB, "mergedVersion");
-
- return logDetailB.obj();
-}
-
-Status runApplyOpsCmd(OperationContext* txn,
- const std::vector<ChunkType>& chunksToMerge,
- const ChunkVersion& currShardVersion,
- const ChunkVersion& newMergedVersion) {
- BSONArrayBuilder updatesB;
-
- // The chunk we'll be "expanding" is the first chunk
- const ChunkType& firstChunk = chunksToMerge.front();
-
- // Fill in details not tracked by metadata
- ChunkType mergedChunk(firstChunk);
- mergedChunk.setMax(chunksToMerge.back().getMax());
- mergedChunk.setVersion(newMergedVersion);
-
- updatesB.append(buildOpMergeChunk(mergedChunk));
-
- // Don't remove chunk we're expanding
- for (size_t i = 1; i < chunksToMerge.size(); ++i) {
- ChunkType chunkToMerge(chunksToMerge[i]);
- updatesB.append(buildOpRemoveChunk(chunkToMerge));
+ ChunkType chunk;
+ if (!metadataAfterMerge->getNextChunk(minKey, &chunk)) {
+ return false;
}
- BSONArray preCond = buildOpPrecond(firstChunk.getNS(), firstChunk.getShard(), currShardVersion);
-
- return grid.catalogClient(txn)->applyChunkOpsDeprecated(
- txn,
- updatesB.arr(),
- preCond,
- firstChunk.getNS(),
- newMergedVersion,
- ShardingCatalogClient::kMajorityWriteConcern,
- repl::ReadConcernLevel::kMajorityReadConcern);
+ return chunk.getMin().woCompare(minKey) == 0 && chunk.getMax().woCompare(maxKey) == 0;
}
-bool mergeChunks(OperationContext* txn,
- const NamespaceString& nss,
- const BSONObj& minKey,
- const BSONObj& maxKey,
- const OID& epoch,
- string* errMsg) {
+Status mergeChunks(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const OID& epoch) {
// Get the distributed lock
+ // TODO(SERVER-25086): Remove distLock acquisition from merge chunk
const string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey
<< " to " << maxKey;
auto scopedDistLock = grid.catalogClient(txn)->distLock(
txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout);
if (!scopedDistLock.isOK()) {
- *errMsg = stream() << "could not acquire collection lock for " << nss.ns()
- << " to merge chunks in [" << redact(minKey) << "," << redact(maxKey)
- << ") " << causedBy(scopedDistLock.getStatus());
+ std::string errmsg = stream() << "could not acquire collection lock for " << nss.ns()
+ << " to merge chunks in [" << redact(minKey) << ", "
+ << redact(maxKey) << ")"
+ << causedBy(scopedDistLock.getStatus());
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(scopedDistLock.getStatus().code(), errmsg);
}
- ShardingState* gss = ShardingState::get(txn);
+ ShardingState* shardingState = ShardingState::get(txn);
//
// We now have the collection lock, refresh metadata to latest version and sanity check
//
ChunkVersion shardVersion;
- Status status = gss->refreshMetadataNow(txn, nss.ns(), &shardVersion);
+ Status refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersion);
- if (!status.isOK()) {
- *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for "
- << nss.ns() << causedBy(redact(status));
+ if (!refreshStatus.isOK()) {
+ std::string errmsg = str::stream()
+ << "could not merge chunks, failed to refresh metadata for " << nss.ns()
+ << causedBy(redact(refreshStatus));
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(refreshStatus.code(), errmsg);
}
if (epoch.isSet() && shardVersion.epoch() != epoch) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " has changed"
- << " since merge was sent"
- << "(sent epoch : " << epoch.toString()
- << ", current epoch : " << shardVersion.epoch().toString() << ")";
-
- warning() << *errMsg;
- return false;
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " has changed"
+ << " since merge was sent"
+ << "(sent epoch : " << epoch.toString()
+ << ", current epoch : " << shardVersion.epoch().toString() << ")";
+
+ warning() << errmsg;
+ return Status(ErrorCodes::StaleEpoch, errmsg);
}
ScopedCollectionMetadata metadata;
@@ -209,52 +134,55 @@ bool mergeChunks(OperationContext* txn,
metadata = CollectionShardingState::get(txn, nss.ns())->getMetadata();
if (!metadata || metadata->getKeyPattern().isEmpty()) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " is not sharded";
+ std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " is not sharded";
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
}
dassert(metadata->getShardVersion().equals(shardVersion));
if (!metadata->isValidKey(minKey) || !metadata->isValidKey(maxKey)) {
- *errMsg = stream() << "could not merge chunks, the range "
- << redact(rangeToString(minKey, maxKey)) << " is not valid"
- << " for collection " << nss.ns() << " with key pattern "
- << metadata->getKeyPattern().toString();
+ std::string errmsg = stream() << "could not merge chunks, the range "
+ << redact(rangeToString(minKey, maxKey)) << " is not valid"
+ << " for collection " << nss.ns() << " with key pattern "
+ << metadata->getKeyPattern().toString();
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
//
// Get merged chunk information
//
-
std::vector<ChunkType> chunksToMerge;
+ std::vector<BSONObj> chunkBoundaries;
+ chunkBoundaries.push_back(minKey);
ChunkType itChunk;
itChunk.setMin(minKey);
itChunk.setMax(minKey);
itChunk.setNS(nss.ns());
- itChunk.setShard(gss->getShardName());
+ itChunk.setShard(shardingState->getShardName());
while (itChunk.getMax().woCompare(maxKey) < 0 &&
metadata->getNextChunk(itChunk.getMax(), &itChunk)) {
+ chunkBoundaries.push_back(itChunk.getMax());
chunksToMerge.push_back(itChunk);
}
if (chunksToMerge.empty()) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " range starting at " << redact(minKey) << " and ending at "
- << redact(maxKey) << " does not belong to shard " << gss->getShardName();
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " range starting at "
+ << redact(minKey) << " and ending at " << redact(maxKey) << " does not belong to shard "
+ << shardingState->getShardName();
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
//
@@ -267,12 +195,12 @@ bool mergeChunks(OperationContext* txn,
bool minKeyInRange = rangeContains(firstDocMin, firstDocMax, minKey);
if (!minKeyInRange) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " range starting at " << redact(minKey)
- << " does not belong to shard " << gss->getShardName();
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " range starting at "
+ << redact(minKey) << " does not belong to shard " << shardingState->getShardName();
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
BSONObj lastDocMin = chunksToMerge.back().getMin();
@@ -281,89 +209,121 @@ bool mergeChunks(OperationContext* txn,
bool maxKeyInRange = lastDocMin.woCompare(maxKey) < 0 && lastDocMax.woCompare(maxKey) >= 0;
if (!maxKeyInRange) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " range ending at " << redact(maxKey) << " does not belong to shard "
- << gss->getShardName();
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " range ending at "
+ << redact(maxKey) << " does not belong to shard " << shardingState->getShardName();
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
bool validRangeStartKey = firstDocMin.woCompare(minKey) == 0;
bool validRangeEndKey = lastDocMax.woCompare(maxKey) == 0;
if (!validRangeStartKey || !validRangeEndKey) {
- *errMsg =
- stream() << "could not merge chunks, collection " << nss.ns()
- << " does not contain a chunk "
- << (!validRangeStartKey ? "starting at " + redact(minKey.toString()) : "")
- << (!validRangeStartKey && !validRangeEndKey ? " or " : "")
- << (!validRangeEndKey ? "ending at " + redact(maxKey.toString()) : "");
-
- warning() << *errMsg;
- return false;
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " does not contain a chunk "
+ << (!validRangeStartKey ? "starting at " + redact(minKey.toString()) : "")
+ << (!validRangeStartKey && !validRangeEndKey ? " or " : "")
+ << (!validRangeEndKey ? "ending at " + redact(maxKey.toString()) : "");
+
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
if (chunksToMerge.size() == 1) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " already contains chunk for "
- << redact(rangeToString(minKey, maxKey));
+ std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " already contains chunk for "
+ << redact(rangeToString(minKey, maxKey));
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
+
// Look for hole in range
for (size_t i = 1; i < chunksToMerge.size(); ++i) {
if (chunksToMerge[i - 1].getMax().woCompare(chunksToMerge[i].getMin()) != 0) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " has a hole in the range "
- << redact(rangeToString(minKey, maxKey)) << " at "
- << redact(rangeToString(chunksToMerge[i - 1].getMax(),
- chunksToMerge[i].getMin()));
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " has a hole in the range "
+ << redact(rangeToString(minKey, maxKey)) << " at "
+ << redact(rangeToString(chunksToMerge[i - 1].getMax(), chunksToMerge[i].getMin()));
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
}
- //
- // Run apply ops command
- //
- ChunkVersion mergeVersion = metadata->getCollVersion();
- mergeVersion.incMinor();
-
- // Ensure that the newly applied chunks would result in a correct metadata state
- auto metadataAfterMerge = uassertStatusOK(metadata->cloneMerge(minKey, maxKey, mergeVersion));
+ {
+ // Ensure that the newly applied chunks would result in a correct metadata state
+ ChunkVersion mergeVersion = metadata->getCollVersion();
+ mergeVersion.incMinor();
- Status applyOpsStatus = runApplyOpsCmd(txn, chunksToMerge, shardVersion, mergeVersion);
- if (!applyOpsStatus.isOK()) {
- warning() << redact(applyOpsStatus);
- return false;
+ uassertStatusOK(metadata->cloneMerge(minKey, maxKey, mergeVersion));
}
//
- // Install merged chunk metadata
+ // Run _configsvrMergeChunks.
//
+ MergeChunkRequest request{
+ nss, shardingState->getShardName(), shardVersion.epoch(), chunkBoundaries};
- {
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X);
-
- auto css = CollectionShardingState::get(txn, nss);
- css->refreshMetadata(txn, std::move(metadataAfterMerge));
- }
+ auto configCmdObj =
+ request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+ auto cmdResponseStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ configCmdObj,
+ Shard::RetryPolicy::kIdempotent);
//
- // Log change
+ // Refresh metadata to pick up new chunk definitions (regardless of the results returned from
+ // running _configsvrMergeChunk).
//
+ {
+ ChunkVersion shardVersionAfterMerge;
+ refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterMerge);
+
+ if (!refreshStatus.isOK()) {
+ std::string errmsg = str::stream() << "failed to refresh metadata for merge chunk ["
+ << redact(minKey) << "," << redact(maxKey) << ") "
+ << redact(refreshStatus);
- BSONObj mergeLogEntry = buildMergeLogEntry(chunksToMerge, shardVersion, mergeVersion);
+ warning() << errmsg;
+ return Status(refreshStatus.code(), errmsg);
+ }
+ }
- grid.catalogClient(txn)->logChange(
- txn, "merge", nss.ns(), mergeLogEntry, ShardingCatalogClient::kMajorityWriteConcern);
+ // If we failed to get any response from the config server at all, despite retries, then we
+ // should just go ahead and fail the whole operation.
+ if (!cmdResponseStatus.isOK()) {
+ return cmdResponseStatus.getStatus();
+ }
+
+ // If _configsvrMergeChunk returned an error, look at this shard's metadata to determine if
+ // the merge actually did happen. This can happen if there's a network error getting the
+ // response from the first call to _configsvrMergeChunk, but it actually succeeds, thus the
+ // automatic retry fails with a precondition violation, for example.
+ auto commandStatus = std::move(cmdResponseStatus.getValue().commandStatus);
+ auto writeConcernStatus = std::move(cmdResponseStatus.getValue().writeConcernStatus);
+
+ if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) &&
+ _checkMetadataForSuccess(txn, nss, minKey, maxKey)) {
+
+ LOG(1) << "mergeChunk [" << redact(minKey) << "," << redact(maxKey)
+ << ") has already been committed.";
+ } else if (!commandStatus.isOK()) {
+ std::string errmsg = str::stream() << "Failed to commit chunk merge"
+ << causedBy(redact(commandStatus));
+ return Status(commandStatus.code(), errmsg);
+ } else if (!writeConcernStatus.isOK()) {
+ std::string errmsg = str::stream() << "Failed to commit chunk merge"
+ << causedBy(redact(writeConcernStatus));
+ return Status(writeConcernStatus.code(), errmsg);
+ }
- return true;
+ return Status::OK();
}
class MergeChunksCommand : public Command {
@@ -492,7 +452,8 @@ public:
return false;
}
- return mergeChunks(txn, NamespaceString(ns), minKey, maxKey, epoch, &errmsg);
+ auto mergeStatus = mergeChunks(txn, NamespaceString(ns), minKey, maxKey, epoch);
+ return appendCommandStatus(result, mergeStatus);
}
} mergeChunksCmd;