summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2016-08-30 18:13:32 -0400
committerSpencer T Brody <spencer@mongodb.com>2016-09-06 12:44:46 -0400
commitf8872dc852e32ff1cdfcead7b530bc5c350edfcf (patch)
tree9e30188a1a43c04178360ffd979651692d2fe7dc /src
parente0a2e534e4c917d6abe7f6d00c1eaf75fa9f2358 (diff)
downloadmongo-f8872dc852e32ff1cdfcead7b530bc5c350edfcf.tar.gz
SERVER-25002 Wire in new _configsvrMergeChunks command so shards no longer run applyOps directly
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp343
-rw-r--r--src/mongo/db/s/split_chunk_command.cpp44
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp136
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h5
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp58
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_type.cpp6
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_type.h4
-rw-r--r--src/mongo/s/request_types/split_chunk_request_type.cpp10
-rw-r--r--src/mongo/s/request_types/split_chunk_request_type.h4
10 files changed, 317 insertions, 294 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index bbf906085bf..4bccdd47e45 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -89,6 +89,7 @@ env.Library(
'config/configsvr_add_shard_to_zone_command.cpp',
'config/configsvr_commit_chunk_migration_command.cpp',
'config/configsvr_control_balancer_command.cpp',
+ 'config/configsvr_merge_chunk_command.cpp',
'config/configsvr_move_chunk_command.cpp',
'config/configsvr_remove_shard_from_zone_command.cpp',
'config/configsvr_set_feature_compatibility_version_command.cpp',
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index 9619e40bdb1..bfc24d2dfc7 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -42,7 +42,9 @@
#include "mongo/db/s/metadata_manager.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/s/request_types/merge_chunk_request_type.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -55,152 +57,75 @@ using std::vector;
namespace {
-BSONArray buildOpPrecond(const string& ns,
- const ShardId& shardName,
- const ChunkVersion& shardVersion) {
- BSONArrayBuilder preCond;
- BSONObjBuilder condB;
- condB.append("ns", ChunkType::ConfigNS);
- condB.append("q",
- BSON("query" << BSON(ChunkType::ns(ns)) << "orderby"
- << BSON(ChunkType::DEPRECATED_lastmod() << -1)));
+bool _checkMetadataForSuccess(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey,
+ const BSONObj& maxKey) {
+ ScopedCollectionMetadata metadataAfterMerge;
{
- BSONObjBuilder resB(condB.subobjStart("res"));
- shardVersion.addToBSON(resB, ChunkType::DEPRECATED_lastmod());
- resB.done();
- }
- preCond.append(condB.obj());
- return preCond.arr();
-}
-
-BSONObj buildOpMergeChunk(const ChunkType& mergedChunk) {
- BSONObjBuilder opB;
-
- // Op basics
- opB.append("op", "u");
- opB.appendBool("b", false); // no upserting
- opB.append("ns", ChunkType::ConfigNS);
-
- // New object
- opB.append("o", mergedChunk.toBSON());
-
- // Query object
- opB.append("o2", BSON(ChunkType::name(mergedChunk.getName())));
-
- return opB.obj();
-}
-
-BSONObj buildOpRemoveChunk(const ChunkType& chunkToRemove) {
- BSONObjBuilder opB;
-
- // Op basics
- opB.append("op", "d"); // delete
- opB.append("ns", ChunkType::ConfigNS);
-
- opB.append("o", BSON(ChunkType::name(chunkToRemove.getName())));
-
- return opB.obj();
-}
-
-BSONObj buildMergeLogEntry(const std::vector<ChunkType>& chunksToMerge,
- const ChunkVersion& currShardVersion,
- const ChunkVersion& newMergedVersion) {
- BSONObjBuilder logDetailB;
-
- BSONArrayBuilder mergedB(logDetailB.subarrayStart("merged"));
+ AutoGetCollection autoColl(txn, nss, MODE_IS);
- for (const ChunkType& chunkToMerge : chunksToMerge) {
- mergedB.append(chunkToMerge.toBSON());
+ // Get collection metadata
+ metadataAfterMerge = CollectionShardingState::get(txn, nss.ns())->getMetadata();
}
- mergedB.done();
-
- currShardVersion.addToBSON(logDetailB, "prevShardVersion");
- newMergedVersion.addToBSON(logDetailB, "mergedVersion");
-
- return logDetailB.obj();
-}
-
-Status runApplyOpsCmd(OperationContext* txn,
- const std::vector<ChunkType>& chunksToMerge,
- const ChunkVersion& currShardVersion,
- const ChunkVersion& newMergedVersion) {
- BSONArrayBuilder updatesB;
-
- // The chunk we'll be "expanding" is the first chunk
- const ChunkType& firstChunk = chunksToMerge.front();
-
- // Fill in details not tracked by metadata
- ChunkType mergedChunk(firstChunk);
- mergedChunk.setMax(chunksToMerge.back().getMax());
- mergedChunk.setVersion(newMergedVersion);
-
- updatesB.append(buildOpMergeChunk(mergedChunk));
-
- // Don't remove chunk we're expanding
- for (size_t i = 1; i < chunksToMerge.size(); ++i) {
- ChunkType chunkToMerge(chunksToMerge[i]);
- updatesB.append(buildOpRemoveChunk(chunkToMerge));
+ ChunkType chunk;
+ if (!metadataAfterMerge->getNextChunk(minKey, &chunk)) {
+ return false;
}
- BSONArray preCond = buildOpPrecond(firstChunk.getNS(), firstChunk.getShard(), currShardVersion);
-
- return grid.catalogClient(txn)->applyChunkOpsDeprecated(
- txn,
- updatesB.arr(),
- preCond,
- firstChunk.getNS(),
- newMergedVersion,
- ShardingCatalogClient::kMajorityWriteConcern,
- repl::ReadConcernLevel::kMajorityReadConcern);
+ return chunk.getMin().woCompare(minKey) == 0 && chunk.getMax().woCompare(maxKey) == 0;
}
-bool mergeChunks(OperationContext* txn,
- const NamespaceString& nss,
- const BSONObj& minKey,
- const BSONObj& maxKey,
- const OID& epoch,
- string* errMsg) {
+Status mergeChunks(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const OID& epoch) {
// Get the distributed lock
+ // TODO(SERVER-25086): Remove distLock acquisition from merge chunk
const string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey
<< " to " << maxKey;
auto scopedDistLock = grid.catalogClient(txn)->distLock(
txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout);
if (!scopedDistLock.isOK()) {
- *errMsg = stream() << "could not acquire collection lock for " << nss.ns()
- << " to merge chunks in [" << redact(minKey) << "," << redact(maxKey)
- << ") " << causedBy(scopedDistLock.getStatus());
+ std::string errmsg = stream() << "could not acquire collection lock for " << nss.ns()
+ << " to merge chunks in [" << redact(minKey) << ", "
+ << redact(maxKey) << ")"
+ << causedBy(scopedDistLock.getStatus());
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(scopedDistLock.getStatus().code(), errmsg);
}
- ShardingState* gss = ShardingState::get(txn);
+ ShardingState* shardingState = ShardingState::get(txn);
//
// We now have the collection lock, refresh metadata to latest version and sanity check
//
ChunkVersion shardVersion;
- Status status = gss->refreshMetadataNow(txn, nss.ns(), &shardVersion);
+ Status refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersion);
- if (!status.isOK()) {
- *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for "
- << nss.ns() << causedBy(redact(status));
+ if (!refreshStatus.isOK()) {
+ std::string errmsg = str::stream()
+ << "could not merge chunks, failed to refresh metadata for " << nss.ns()
+ << causedBy(redact(refreshStatus));
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(refreshStatus.code(), errmsg);
}
if (epoch.isSet() && shardVersion.epoch() != epoch) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " has changed"
- << " since merge was sent"
- << "(sent epoch : " << epoch.toString()
- << ", current epoch : " << shardVersion.epoch().toString() << ")";
-
- warning() << *errMsg;
- return false;
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " has changed"
+ << " since merge was sent"
+ << "(sent epoch : " << epoch.toString()
+ << ", current epoch : " << shardVersion.epoch().toString() << ")";
+
+ warning() << errmsg;
+ return Status(ErrorCodes::StaleEpoch, errmsg);
}
ScopedCollectionMetadata metadata;
@@ -209,52 +134,55 @@ bool mergeChunks(OperationContext* txn,
metadata = CollectionShardingState::get(txn, nss.ns())->getMetadata();
if (!metadata || metadata->getKeyPattern().isEmpty()) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " is not sharded";
+ std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " is not sharded";
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
}
dassert(metadata->getShardVersion().equals(shardVersion));
if (!metadata->isValidKey(minKey) || !metadata->isValidKey(maxKey)) {
- *errMsg = stream() << "could not merge chunks, the range "
- << redact(rangeToString(minKey, maxKey)) << " is not valid"
- << " for collection " << nss.ns() << " with key pattern "
- << metadata->getKeyPattern().toString();
+ std::string errmsg = stream() << "could not merge chunks, the range "
+ << redact(rangeToString(minKey, maxKey)) << " is not valid"
+ << " for collection " << nss.ns() << " with key pattern "
+ << metadata->getKeyPattern().toString();
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
//
// Get merged chunk information
//
-
std::vector<ChunkType> chunksToMerge;
+ std::vector<BSONObj> chunkBoundaries;
+ chunkBoundaries.push_back(minKey);
ChunkType itChunk;
itChunk.setMin(minKey);
itChunk.setMax(minKey);
itChunk.setNS(nss.ns());
- itChunk.setShard(gss->getShardName());
+ itChunk.setShard(shardingState->getShardName());
while (itChunk.getMax().woCompare(maxKey) < 0 &&
metadata->getNextChunk(itChunk.getMax(), &itChunk)) {
+ chunkBoundaries.push_back(itChunk.getMax());
chunksToMerge.push_back(itChunk);
}
if (chunksToMerge.empty()) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " range starting at " << redact(minKey) << " and ending at "
- << redact(maxKey) << " does not belong to shard " << gss->getShardName();
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " range starting at "
+ << redact(minKey) << " and ending at " << redact(maxKey) << " does not belong to shard "
+ << shardingState->getShardName();
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
//
@@ -267,12 +195,12 @@ bool mergeChunks(OperationContext* txn,
bool minKeyInRange = rangeContains(firstDocMin, firstDocMax, minKey);
if (!minKeyInRange) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " range starting at " << redact(minKey)
- << " does not belong to shard " << gss->getShardName();
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " range starting at "
+ << redact(minKey) << " does not belong to shard " << shardingState->getShardName();
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
BSONObj lastDocMin = chunksToMerge.back().getMin();
@@ -281,89 +209,121 @@ bool mergeChunks(OperationContext* txn,
bool maxKeyInRange = lastDocMin.woCompare(maxKey) < 0 && lastDocMax.woCompare(maxKey) >= 0;
if (!maxKeyInRange) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " range ending at " << redact(maxKey) << " does not belong to shard "
- << gss->getShardName();
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " range ending at "
+ << redact(maxKey) << " does not belong to shard " << shardingState->getShardName();
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
bool validRangeStartKey = firstDocMin.woCompare(minKey) == 0;
bool validRangeEndKey = lastDocMax.woCompare(maxKey) == 0;
if (!validRangeStartKey || !validRangeEndKey) {
- *errMsg =
- stream() << "could not merge chunks, collection " << nss.ns()
- << " does not contain a chunk "
- << (!validRangeStartKey ? "starting at " + redact(minKey.toString()) : "")
- << (!validRangeStartKey && !validRangeEndKey ? " or " : "")
- << (!validRangeEndKey ? "ending at " + redact(maxKey.toString()) : "");
-
- warning() << *errMsg;
- return false;
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " does not contain a chunk "
+ << (!validRangeStartKey ? "starting at " + redact(minKey.toString()) : "")
+ << (!validRangeStartKey && !validRangeEndKey ? " or " : "")
+ << (!validRangeEndKey ? "ending at " + redact(maxKey.toString()) : "");
+
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
if (chunksToMerge.size() == 1) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " already contains chunk for "
- << redact(rangeToString(minKey, maxKey));
+ std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns()
+ << " already contains chunk for "
+ << redact(rangeToString(minKey, maxKey));
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
+
// Look for hole in range
for (size_t i = 1; i < chunksToMerge.size(); ++i) {
if (chunksToMerge[i - 1].getMax().woCompare(chunksToMerge[i].getMin()) != 0) {
- *errMsg = stream() << "could not merge chunks, collection " << nss.ns()
- << " has a hole in the range "
- << redact(rangeToString(minKey, maxKey)) << " at "
- << redact(rangeToString(chunksToMerge[i - 1].getMax(),
- chunksToMerge[i].getMin()));
+ std::string errmsg = stream()
+ << "could not merge chunks, collection " << nss.ns() << " has a hole in the range "
+ << redact(rangeToString(minKey, maxKey)) << " at "
+ << redact(rangeToString(chunksToMerge[i - 1].getMax(), chunksToMerge[i].getMin()));
- warning() << *errMsg;
- return false;
+ warning() << errmsg;
+ return Status(ErrorCodes::IllegalOperation, errmsg);
}
}
- //
- // Run apply ops command
- //
- ChunkVersion mergeVersion = metadata->getCollVersion();
- mergeVersion.incMinor();
-
- // Ensure that the newly applied chunks would result in a correct metadata state
- auto metadataAfterMerge = uassertStatusOK(metadata->cloneMerge(minKey, maxKey, mergeVersion));
+ {
+ // Ensure that the newly applied chunks would result in a correct metadata state
+ ChunkVersion mergeVersion = metadata->getCollVersion();
+ mergeVersion.incMinor();
- Status applyOpsStatus = runApplyOpsCmd(txn, chunksToMerge, shardVersion, mergeVersion);
- if (!applyOpsStatus.isOK()) {
- warning() << redact(applyOpsStatus);
- return false;
+ uassertStatusOK(metadata->cloneMerge(minKey, maxKey, mergeVersion));
}
//
- // Install merged chunk metadata
+ // Run _configsvrMergeChunks.
//
+ MergeChunkRequest request{
+ nss, shardingState->getShardName(), shardVersion.epoch(), chunkBoundaries};
- {
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X);
-
- auto css = CollectionShardingState::get(txn, nss);
- css->refreshMetadata(txn, std::move(metadataAfterMerge));
- }
+ auto configCmdObj =
+ request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+ auto cmdResponseStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ configCmdObj,
+ Shard::RetryPolicy::kIdempotent);
//
- // Log change
+ // Refresh metadata to pick up new chunk definitions (regardless of the results returned from
+ // running _configsvrMergeChunk).
//
+ {
+ ChunkVersion shardVersionAfterMerge;
+ refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterMerge);
+
+ if (!refreshStatus.isOK()) {
+ std::string errmsg = str::stream() << "failed to refresh metadata for merge chunk ["
+ << redact(minKey) << "," << redact(maxKey) << ") "
+ << redact(refreshStatus);
- BSONObj mergeLogEntry = buildMergeLogEntry(chunksToMerge, shardVersion, mergeVersion);
+ warning() << errmsg;
+ return Status(refreshStatus.code(), errmsg);
+ }
+ }
- grid.catalogClient(txn)->logChange(
- txn, "merge", nss.ns(), mergeLogEntry, ShardingCatalogClient::kMajorityWriteConcern);
+ // If we failed to get any response from the config server at all, despite retries, then we
+ // should just go ahead and fail the whole operation.
+ if (!cmdResponseStatus.isOK()) {
+ return cmdResponseStatus.getStatus();
+ }
+
+ // If _configsvrMergeChunk returned an error, look at this shard's metadata to determine if
+ // the merge actually did happen. This can happen if there's a network error getting the
+ // response from the first call to _configsvrMergeChunk, but it actually succeeds, thus the
+ // automatic retry fails with a precondition violation, for example.
+ auto commandStatus = std::move(cmdResponseStatus.getValue().commandStatus);
+ auto writeConcernStatus = std::move(cmdResponseStatus.getValue().writeConcernStatus);
+
+ if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) &&
+ _checkMetadataForSuccess(txn, nss, minKey, maxKey)) {
+
+ LOG(1) << "mergeChunk [" << redact(minKey) << "," << redact(maxKey)
+ << ") has already been committed.";
+ } else if (!commandStatus.isOK()) {
+ std::string errmsg = str::stream() << "Failed to commit chunk merge"
+ << causedBy(redact(commandStatus));
+ return Status(commandStatus.code(), errmsg);
+ } else if (!writeConcernStatus.isOK()) {
+ std::string errmsg = str::stream() << "Failed to commit chunk merge"
+ << causedBy(redact(writeConcernStatus));
+ return Status(writeConcernStatus.code(), errmsg);
+ }
- return true;
+ return Status::OK();
}
class MergeChunksCommand : public Command {
@@ -492,7 +452,8 @@ public:
return false;
}
- return mergeChunks(txn, NamespaceString(ns), minKey, maxKey, epoch, &errmsg);
+ auto mergeStatus = mergeChunks(txn, NamespaceString(ns), minKey, maxKey, epoch);
+ return appendCommandStatus(result, mergeStatus);
}
} mergeChunksCmd;
diff --git a/src/mongo/db/s/split_chunk_command.cpp b/src/mongo/db/s/split_chunk_command.cpp
index ae37a92af65..c28ea1a5cec 100644
--- a/src/mongo/db/s/split_chunk_command.cpp
+++ b/src/mongo/db/s/split_chunk_command.cpp
@@ -100,10 +100,10 @@ bool checkIfSingleDoc(OperationContext* txn,
// using the specified splitPoints. Returns false if the metadata's chunks don't match
// the new chunk boundaries exactly.
//
-bool checkMetadataForSuccess(OperationContext* txn,
- const NamespaceString& nss,
- const ChunkRange& chunkRange,
- const std::vector<BSONObj>& splitKeys) {
+bool _checkMetadataForSuccess(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkRange& chunkRange,
+ const std::vector<BSONObj>& splitKeys) {
ScopedCollectionMetadata metadataAfterSplit;
{
AutoGetCollection autoColl(txn, nss, MODE_IS);
@@ -118,13 +118,9 @@ bool checkMetadataForSuccess(OperationContext* txn,
ChunkType nextChunk;
for (const auto& endKey : newChunkBounds) {
- log() << "checking metadataAfterSplit for new chunk boundaries [" << redact(startKey) << ","
- << redact(endKey) << ")";
// Check that all new chunks fit the new chunk boundaries
if (!metadataAfterSplit->getNextChunk(startKey, &nextChunk) ||
nextChunk.getMax().woCompare(endKey)) {
- log() << "ERROR, found [" << redact(startKey) << "," << redact(nextChunk.getMax())
- << ")";
return false;
}
@@ -254,7 +250,7 @@ public:
//
// Lock the collection's metadata and get highest version for the current shard
- // TODO(SERVER-25086): Remove distLock aquisition from split chunk
+ // TODO(SERVER-25086): Remove distLock acquisition from split chunk
//
const string whyMessage(str::stream() << "splitting chunk [" << min << ", " << max
<< ") in "
@@ -350,10 +346,10 @@ public:
log() << "splitChunk accepted at version " << shardVersion;
auto request = SplitChunkRequest(
- nss, expectedCollectionVersion.epoch(), chunkRange, splitKeys, shardName);
+ nss, shardName, expectedCollectionVersion.epoch(), chunkRange, splitKeys);
- auto configCmdObj = request.toConfigCommandBSON(
- BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+ auto configCmdObj =
+ request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
auto cmdResponseStatus =
Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
@@ -366,17 +362,23 @@ public:
//
// Refresh chunk metadata regardless of whether or not the split succeeded
//
- ChunkVersion shardVersionAfterSplit;
- refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterSplit);
+ {
+ ChunkVersion shardVersionAfterSplit;
+ refreshStatus =
+ shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterSplit);
- if (!refreshStatus.isOK()) {
- errmsg = str::stream() << "failed to refresh metadata for split chunk [" << redact(min)
- << "," << redact(max) << ") " << causedBy(redact(refreshStatus));
+ if (!refreshStatus.isOK()) {
+ errmsg = str::stream() << "failed to refresh metadata for split chunk ["
+ << redact(min) << "," << redact(max) << ") "
+ << causedBy(redact(refreshStatus));
- warning() << errmsg;
- return false;
+ warning() << errmsg;
+ return false;
+ }
}
+ // If we failed to get any response from the config server at all, despite retries, then we
+ // should just go ahead and fail the whole operation.
if (!cmdResponseStatus.isOK())
return appendCommandStatus(result, cmdResponseStatus.getStatus());
@@ -400,13 +402,13 @@ public:
}
//
- // If _configsvrSplitChunk returned an error, look at this shard's metadata to deterine if
+ // If _configsvrSplitChunk returned an error, look at this shard's metadata to determine if
// the split actually did happen. This can happen if there's a network error getting the
// response from the first call to _configsvrSplitChunk, but it actually succeeds, thus the
// automatic retry fails with a precondition violation, for example.
//
if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) &&
- checkMetadataForSuccess(txn, nss, chunkRange, splitKeys)) {
+ _checkMetadataForSuccess(txn, nss, chunkRange, splitKeys)) {
LOG(1) << "splitChunk [" << redact(min) << "," << redact(max)
<< ") has already been committed.";
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index 48f0b6b99be..b27c588fa2d 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -102,7 +102,7 @@ const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::
/**
* Append min, max and version information from chunk to the buffer for logChange purposes.
*/
-void appendShortVersion(BufBuilder* b, const ChunkType& chunk) {
+void _appendShortVersion(BufBuilder* b, const ChunkType& chunk) {
BSONObjBuilder bb(*b);
bb.append(ChunkType::min(), chunk.getMin());
bb.append(ChunkType::max(), chunk.getMax());
@@ -228,6 +228,69 @@ StatusWith<ChunkRange> includeFullShardKey(OperationContext* txn,
shardKeyPattern.extendRangeBound(range.getMax(), false));
}
+BSONArray _buildMergeChunksApplyOpsUpdates(const std::vector<ChunkType>& chunksToMerge,
+ const ChunkVersion& mergeVersion) {
+ BSONArrayBuilder updates;
+
+ // Build an update operation to expand the first chunk into the newly merged chunk
+ {
+ BSONObjBuilder op;
+ op.append("op", "u");
+ op.appendBool("b", false); // no upsert
+ op.append("ns", ChunkType::ConfigNS);
+
+ // expand first chunk into newly merged chunk
+ ChunkType mergedChunk(chunksToMerge.front());
+ mergedChunk.setMax(chunksToMerge.back().getMax());
+
+ // fill in additional details for sending through applyOps
+ mergedChunk.setVersion(mergeVersion);
+
+ // add the new chunk information as the update object
+ op.append("o", mergedChunk.toBSON());
+
+ // query object
+ op.append("o2", BSON(ChunkType::name(mergedChunk.getName())));
+
+ updates.append(op.obj());
+ }
+
+ // Build update operations to delete the rest of the chunks to be merged. Remember not
+ // to delete the first chunk we're expanding
+ for (size_t i = 1; i < chunksToMerge.size(); ++i) {
+ BSONObjBuilder op;
+ op.append("op", "d");
+ op.append("ns", ChunkType::ConfigNS);
+
+ op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName())));
+
+ updates.append(op.obj());
+ }
+
+ return updates.arr();
+}
+
+BSONArray _buildMergeChunksApplyOpsPrecond(const std::vector<ChunkType>& chunksToMerge,
+ const ChunkVersion& collVersion) {
+ BSONArrayBuilder preCond;
+
+ for (auto chunk : chunksToMerge) {
+ BSONObjBuilder b;
+ b.append("ns", ChunkType::ConfigNS);
+ b.append(
+ "q",
+ BSON("query" << BSON(ChunkType::ns(chunk.getNS()) << ChunkType::min(chunk.getMin())
+ << ChunkType::max(chunk.getMax()))
+ << "orderby"
+ << BSON(ChunkType::DEPRECATED_lastmod() << -1)));
+ b.append("res",
+ BSON(ChunkType::DEPRECATED_epoch(collVersion.epoch())
+ << ChunkType::shard(chunk.getShard().toString())));
+ preCond.append(b.obj());
+ }
+ return preCond.arr();
+}
+
} // namespace
ShardingCatalogManagerImpl::ShardingCatalogManagerImpl(
@@ -1212,8 +1275,8 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
}
if (newChunks.size() == 2) {
- _appendShortVersion(logDetail.subobjStart("left"), newChunks[0]);
- _appendShortVersion(logDetail.subobjStart("right"), newChunks[1]);
+ _appendShortVersion(&logDetail.subobjStart("left"), newChunks[0]);
+ _appendShortVersion(&logDetail.subobjStart("right"), newChunks[1]);
grid.catalogClient(txn)->logChange(
txn, "split", ns.ns(), logDetail.obj(), WriteConcernOptions());
@@ -1227,7 +1290,7 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
chunkDetail.appendElements(beforeDetailObj);
chunkDetail.append("number", i + 1);
chunkDetail.append("of", newChunksSize);
- _appendShortVersion(chunkDetail.subobjStart("chunk"), newChunks[i]);
+ _appendShortVersion(&chunkDetail.subobjStart("chunk"), newChunks[i]);
grid.catalogClient(txn)->logChange(
txn, "multi-split", ns.ns(), chunkDetail.obj(), WriteConcernOptions());
@@ -1305,62 +1368,14 @@ Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn,
ChunkVersion mergeVersion = collVersion;
mergeVersion.incMinor();
- BSONArrayBuilder updates;
-
- // Build an update operation to expand the first chunk into the newly merged chunk
- {
- BSONObjBuilder op;
- op.append("op", "u");
- op.appendBool("b", false);
- op.append("ns", ChunkType::ConfigNS);
-
- // expand first chunk into newly merged chunk
- ChunkType mergedChunk(chunksToMerge.front());
- mergedChunk.setMax(chunksToMerge.back().getMax());
-
- // fill in additional details for sending through applyOps
- mergedChunk.setVersion(mergeVersion);
-
- // add the new chunk information as the update object
- op.append("o", mergedChunk.toBSON());
-
- // query object
- op.append("o2", BSON(ChunkType::name(mergedChunk.getName())));
-
- updates.append(op.obj());
- }
-
- // Build update operations to delete the rest of the chunks to be merged. Remember not
- // to delete the first chunk we're expanding
- for (size_t i = 1; i < chunksToMerge.size(); ++i) {
- BSONObjBuilder op;
- op.append("op", "d");
- op.append("ns", ChunkType::ConfigNS);
-
- op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName())));
-
- updates.append(op.obj());
- }
-
- BSONArrayBuilder preCond;
- {
- BSONObjBuilder b;
- b.append("ns", ChunkType::ConfigNS);
- b.append("q",
- BSON("query" << BSON(ChunkType::ns(ns.ns())) << "orderby"
- << BSON(ChunkType::DEPRECATED_lastmod() << -1)));
- {
- BSONObjBuilder bb(b.subobjStart("res"));
- collVersion.addToBSON(bb, ChunkType::DEPRECATED_lastmod());
- }
- preCond.append(b.obj());
- }
+ auto updates = _buildMergeChunksApplyOpsUpdates(chunksToMerge, mergeVersion);
+ auto preCond = _buildMergeChunksApplyOpsPrecond(chunksToMerge, collVersion);
// apply the batch of updates to remote and local metadata
Status applyOpsStatus =
grid.catalogClient(txn)->applyChunkOpsDeprecated(txn,
- updates.arr(),
- preCond.arr(),
+ updates,
+ preCond,
ns.ns(),
mergeVersion,
WriteConcernOptions(),
@@ -1386,15 +1401,6 @@ Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn,
return applyOpsStatus;
}
-void ShardingCatalogManagerImpl::_appendShortVersion(BufBuilder& b, const ChunkType& chunk) {
- BSONObjBuilder bb(b);
- bb.append(ChunkType::min(), chunk.getMin());
- bb.append(ChunkType::max(), chunk.getMax());
- if (chunk.isVersionSet())
- chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod());
- bb.done();
-}
-
void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) {
_executorForAddShard->appendConnectionStats(stats);
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
index aa4c437151b..5294e31ace7 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
@@ -195,11 +195,6 @@ private:
StatusWith<std::vector<ShardType>> _getAllShardingUnawareShards(OperationContext* txn);
/**
- * Append min, max and version information from chunk to the buffer for logChange purposes.
- */
- void _appendShortVersion(BufBuilder& b, const ChunkType& chunk);
-
- /**
* Callback function used when rescheduling an addShard task after the first attempt failed.
* Checks if the callback has been canceled, and if not, proceeds to call
* _scheduleAddShardTask.
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp
index 23e3803379d..f3ae19faa30 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp
@@ -361,5 +361,63 @@ TEST_F(MergeChunkTest, NonMatchingEpochsOfChunkAndRequestErrors) {
ASSERT_EQ(ErrorCodes::StaleEpoch, mergeStatus);
}
+TEST_F(MergeChunkTest, MergeAlreadyHappenedFailsPrecondition) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 0, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ // Construct chunk to be merged
+ auto chunk2(chunk);
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkBound = BSON("a" << 5);
+ auto chunkMax = BSON("a" << 10);
+ // first chunk boundaries
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkBound);
+ // second chunk boundaries
+ chunk2.setMin(chunkBound);
+ chunk2.setMax(chunkMax);
+
+ std::vector<BSONObj> chunkBoundaries{chunkMin, chunkBound, chunkMax};
+
+ ChunkType mergedChunk(chunk);
+ auto mergedVersion = chunk.getVersion();
+ mergedVersion.incMinor();
+ mergedChunk.setVersion(mergedVersion);
+ mergedChunk.setMax(chunkMax);
+
+ setupChunks({mergedChunk});
+
+ ASSERT_EQ(ErrorCodes::BadValue,
+ catalogManager()->commitChunkMerge(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ origVersion.epoch(),
+ chunkBoundaries,
+ "shard0000"));
+
+ // Verify that no change to config.chunks happened.
+ auto findResponse = uassertStatusOK(
+ getConfigShard()->exhaustiveFindOnConfig(operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON(ChunkType::ns() << "TestDB.TestColl"),
+ BSON(ChunkType::DEPRECATED_lastmod << -1),
+ boost::none));
+
+ const auto& chunksVector = findResponse.docs;
+
+ // There should be exactly one chunk left in the collection
+ ASSERT_EQ(1u, chunksVector.size());
+
+ // MergedChunk should have range [chunkMin, chunkMax]
+ ChunkType foundChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.front()));
+ ASSERT_BSONOBJ_EQ(mergedChunk.toBSON(), foundChunk.toBSON());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/request_types/merge_chunk_request_type.cpp b/src/mongo/s/request_types/merge_chunk_request_type.cpp
index 508bf9b247d..103c5087368 100644
--- a/src/mongo/s/request_types/merge_chunk_request_type.cpp
+++ b/src/mongo/s/request_types/merge_chunk_request_type.cpp
@@ -48,9 +48,9 @@ const char kShardName[] = "shard";
} // unnamed namespace
MergeChunkRequest::MergeChunkRequest(NamespaceString nss,
+ string shardName,
OID epoch,
- vector<BSONObj> chunkBoundaries,
- string shardName)
+ vector<BSONObj> chunkBoundaries)
: _nss(std::move(nss)),
_epoch(std::move(epoch)),
_chunkBoundaries(std::move(chunkBoundaries)),
@@ -94,7 +94,7 @@ StatusWith<MergeChunkRequest> MergeChunkRequest::parseFromConfigCommand(const BS
}
auto request = MergeChunkRequest(
- NamespaceString(ns), std::move(epoch), std::move(chunkBoundaries), std::move(shardName));
+ NamespaceString(ns), std::move(shardName), std::move(epoch), std::move(chunkBoundaries));
Status validationStatus = request._validate();
if (!validationStatus.isOK()) {
return validationStatus;
diff --git a/src/mongo/s/request_types/merge_chunk_request_type.h b/src/mongo/s/request_types/merge_chunk_request_type.h
index 5aa72ab6356..31cc6e69b19 100644
--- a/src/mongo/s/request_types/merge_chunk_request_type.h
+++ b/src/mongo/s/request_types/merge_chunk_request_type.h
@@ -44,9 +44,9 @@ namespace mongo {
class MergeChunkRequest {
public:
MergeChunkRequest(NamespaceString nss,
+ std::string shardName,
OID epoch,
- std::vector<BSONObj> chunkBoundaries,
- std::string shardName);
+ std::vector<BSONObj> chunkBoundaries);
/**
* Parses the provided BSON content as the internal _configsvrMergeChunk command, and if
diff --git a/src/mongo/s/request_types/split_chunk_request_type.cpp b/src/mongo/s/request_types/split_chunk_request_type.cpp
index 78c2c478d0c..a3ca50c7f32 100644
--- a/src/mongo/s/request_types/split_chunk_request_type.cpp
+++ b/src/mongo/s/request_types/split_chunk_request_type.cpp
@@ -48,10 +48,10 @@ const char kShardName[] = "shard";
} // unnamed namespace
SplitChunkRequest::SplitChunkRequest(NamespaceString nss,
+ string shardName,
OID epoch,
ChunkRange chunkRange,
- vector<BSONObj> splitPoints,
- string shardName)
+ vector<BSONObj> splitPoints)
: _nss(std::move(nss)),
_epoch(std::move(epoch)),
_chunkRange(std::move(chunkRange)),
@@ -102,10 +102,10 @@ StatusWith<SplitChunkRequest> SplitChunkRequest::parseFromConfigCommand(const BS
}
auto request = SplitChunkRequest(NamespaceString(ns),
+ std::move(shardName),
std::move(epoch),
std::move(chunkRangeStatus.getValue()),
- std::move(splitPoints),
- std::move(shardName));
+ std::move(splitPoints));
Status validationStatus = request._validate();
if (!validationStatus.isOK()) {
return validationStatus;
@@ -160,7 +160,7 @@ const string& SplitChunkRequest::getShardName() const {
Status SplitChunkRequest::_validate() {
if (!getNamespace().isValid()) {
return Status(ErrorCodes::InvalidNamespace,
- str::stream() << "invalid namespaace '" << _nss.ns()
+ str::stream() << "invalid namespace '" << _nss.ns()
<< "' specified for request");
}
diff --git a/src/mongo/s/request_types/split_chunk_request_type.h b/src/mongo/s/request_types/split_chunk_request_type.h
index 35a4b767923..f8fd4972a9d 100644
--- a/src/mongo/s/request_types/split_chunk_request_type.h
+++ b/src/mongo/s/request_types/split_chunk_request_type.h
@@ -45,10 +45,10 @@ namespace mongo {
class SplitChunkRequest {
public:
SplitChunkRequest(NamespaceString nss,
+ std::string shardName,
OID epoch,
ChunkRange chunkRange,
- std::vector<BSONObj> splitPoints,
- std::string shardName);
+ std::vector<BSONObj> splitPoints);
/**
* Parses the provided BSON content as the internal _configsvrSplitChunk command, and if