diff options
author | Spencer T Brody <spencer@mongodb.com> | 2016-08-30 18:13:32 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2016-09-06 12:44:46 -0400 |
commit | f8872dc852e32ff1cdfcead7b530bc5c350edfcf (patch) | |
tree | 9e30188a1a43c04178360ffd979651692d2fe7dc /src | |
parent | e0a2e534e4c917d6abe7f6d00c1eaf75fa9f2358 (diff) | |
download | mongo-f8872dc852e32ff1cdfcead7b530bc5c350edfcf.tar.gz |
SERVER-25002 Wire in new _configsvrMergeChunks command so shards no longer run applyOps directly
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 343 | ||||
-rw-r--r-- | src/mongo/db/s/split_chunk_command.cpp | 44 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp | 136 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h | 5 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp | 58 | ||||
-rw-r--r-- | src/mongo/s/request_types/merge_chunk_request_type.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/request_types/merge_chunk_request_type.h | 4 | ||||
-rw-r--r-- | src/mongo/s/request_types/split_chunk_request_type.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/request_types/split_chunk_request_type.h | 4 |
10 files changed, 317 insertions, 294 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index bbf906085bf..4bccdd47e45 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -89,6 +89,7 @@ env.Library( 'config/configsvr_add_shard_to_zone_command.cpp', 'config/configsvr_commit_chunk_migration_command.cpp', 'config/configsvr_control_balancer_command.cpp', + 'config/configsvr_merge_chunk_command.cpp', 'config/configsvr_move_chunk_command.cpp', 'config/configsvr_remove_shard_from_zone_command.cpp', 'config/configsvr_set_feature_compatibility_version_command.cpp', diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index 9619e40bdb1..bfc24d2dfc7 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -42,7 +42,9 @@ #include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/s/request_types/merge_chunk_request_type.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -55,152 +57,75 @@ using std::vector; namespace { -BSONArray buildOpPrecond(const string& ns, - const ShardId& shardName, - const ChunkVersion& shardVersion) { - BSONArrayBuilder preCond; - BSONObjBuilder condB; - condB.append("ns", ChunkType::ConfigNS); - condB.append("q", - BSON("query" << BSON(ChunkType::ns(ns)) << "orderby" - << BSON(ChunkType::DEPRECATED_lastmod() << -1))); +bool _checkMetadataForSuccess(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey, + const BSONObj& maxKey) { + ScopedCollectionMetadata metadataAfterMerge; { - BSONObjBuilder resB(condB.subobjStart("res")); - shardVersion.addToBSON(resB, ChunkType::DEPRECATED_lastmod()); - resB.done(); - } - preCond.append(condB.obj()); - return preCond.arr(); -} - -BSONObj buildOpMergeChunk(const ChunkType& mergedChunk) { - BSONObjBuilder opB; - - // Op basics - opB.append("op", "u"); - opB.appendBool("b", false); // no upserting - opB.append("ns", ChunkType::ConfigNS); - - // New object - opB.append("o", mergedChunk.toBSON()); - - // Query object - opB.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); - - return opB.obj(); -} - -BSONObj buildOpRemoveChunk(const ChunkType& chunkToRemove) { - BSONObjBuilder opB; - - // Op basics - opB.append("op", "d"); // delete - opB.append("ns", ChunkType::ConfigNS); - - opB.append("o", BSON(ChunkType::name(chunkToRemove.getName()))); - - return opB.obj(); -} - -BSONObj buildMergeLogEntry(const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& currShardVersion, - const ChunkVersion& newMergedVersion) { - BSONObjBuilder logDetailB; - - BSONArrayBuilder mergedB(logDetailB.subarrayStart("merged")); + AutoGetCollection autoColl(txn, nss, MODE_IS); - for (const ChunkType& chunkToMerge : chunksToMerge) { - mergedB.append(chunkToMerge.toBSON()); + // Get collection metadata + metadataAfterMerge = CollectionShardingState::get(txn, nss.ns())->getMetadata(); } - mergedB.done(); - - currShardVersion.addToBSON(logDetailB, "prevShardVersion"); - newMergedVersion.addToBSON(logDetailB, "mergedVersion"); - - return logDetailB.obj(); -} - -Status runApplyOpsCmd(OperationContext* txn, - const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& currShardVersion, - const ChunkVersion& newMergedVersion) { - BSONArrayBuilder updatesB; - - // The chunk we'll be "expanding" is the first chunk - const ChunkType& firstChunk = chunksToMerge.front(); - - // Fill in details not tracked by metadata - ChunkType mergedChunk(firstChunk); - mergedChunk.setMax(chunksToMerge.back().getMax()); - mergedChunk.setVersion(newMergedVersion); - - updatesB.append(buildOpMergeChunk(mergedChunk)); - - // Don't remove chunk we're expanding - for (size_t i = 1; i < chunksToMerge.size(); ++i) { - ChunkType chunkToMerge(chunksToMerge[i]); - updatesB.append(buildOpRemoveChunk(chunkToMerge)); + ChunkType chunk; + if (!metadataAfterMerge->getNextChunk(minKey, &chunk)) { + return false; } - BSONArray preCond = buildOpPrecond(firstChunk.getNS(), firstChunk.getShard(), currShardVersion); - - return grid.catalogClient(txn)->applyChunkOpsDeprecated( - txn, - updatesB.arr(), - preCond, - firstChunk.getNS(), - newMergedVersion, - ShardingCatalogClient::kMajorityWriteConcern, - repl::ReadConcernLevel::kMajorityReadConcern); + return chunk.getMin().woCompare(minKey) == 0 && chunk.getMax().woCompare(maxKey) == 0; } -bool mergeChunks(OperationContext* txn, - const NamespaceString& nss, - const BSONObj& minKey, - const BSONObj& maxKey, - const OID& epoch, - string* errMsg) { +Status mergeChunks(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey, + const BSONObj& maxKey, + const OID& epoch) { // Get the distributed lock + // TODO(SERVER-25086): Remove distLock acquisition from merge chunk const string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey << " to " << maxKey; auto scopedDistLock = grid.catalogClient(txn)->distLock( txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); if (!scopedDistLock.isOK()) { - *errMsg = stream() << "could not acquire collection lock for " << nss.ns() - << " to merge chunks in [" << redact(minKey) << "," << redact(maxKey) - << ") " << causedBy(scopedDistLock.getStatus()); + std::string errmsg = stream() << "could not acquire collection lock for " << nss.ns() + << " to merge chunks in [" << redact(minKey) << ", " + << redact(maxKey) << ")" + << causedBy(scopedDistLock.getStatus()); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(scopedDistLock.getStatus().code(), errmsg); } - ShardingState* gss = ShardingState::get(txn); + ShardingState* shardingState = ShardingState::get(txn); // // We now have the collection lock, refresh metadata to latest version and sanity check // ChunkVersion shardVersion; - Status status = gss->refreshMetadataNow(txn, nss.ns(), &shardVersion); + Status refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersion); - if (!status.isOK()) { - *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for " - << nss.ns() << causedBy(redact(status)); + if (!refreshStatus.isOK()) { + std::string errmsg = str::stream() + << "could not merge chunks, failed to refresh metadata for " << nss.ns() + << causedBy(redact(refreshStatus)); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(refreshStatus.code(), errmsg); } if (epoch.isSet() && shardVersion.epoch() != epoch) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " has changed" - << " since merge was sent" - << "(sent epoch : " << epoch.toString() - << ", current epoch : " << shardVersion.epoch().toString() << ")"; - - warning() << *errMsg; - return false; + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " has changed" + << " since merge was sent" + << "(sent epoch : " << epoch.toString() + << ", current epoch : " << shardVersion.epoch().toString() << ")"; + + warning() << errmsg; + return Status(ErrorCodes::StaleEpoch, errmsg); } ScopedCollectionMetadata metadata; @@ -209,52 +134,55 @@ bool mergeChunks(OperationContext* txn, metadata = CollectionShardingState::get(txn, nss.ns())->getMetadata(); if (!metadata || metadata->getKeyPattern().isEmpty()) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " is not sharded"; + std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns() + << " is not sharded"; - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } } dassert(metadata->getShardVersion().equals(shardVersion)); if (!metadata->isValidKey(minKey) || !metadata->isValidKey(maxKey)) { - *errMsg = stream() << "could not merge chunks, the range " - << redact(rangeToString(minKey, maxKey)) << " is not valid" - << " for collection " << nss.ns() << " with key pattern " - << metadata->getKeyPattern().toString(); + std::string errmsg = stream() << "could not merge chunks, the range " + << redact(rangeToString(minKey, maxKey)) << " is not valid" + << " for collection " << nss.ns() << " with key pattern " + << metadata->getKeyPattern().toString(); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } // // Get merged chunk information // - std::vector<ChunkType> chunksToMerge; + std::vector<BSONObj> chunkBoundaries; + chunkBoundaries.push_back(minKey); ChunkType itChunk; itChunk.setMin(minKey); itChunk.setMax(minKey); itChunk.setNS(nss.ns()); - itChunk.setShard(gss->getShardName()); + itChunk.setShard(shardingState->getShardName()); while (itChunk.getMax().woCompare(maxKey) < 0 && metadata->getNextChunk(itChunk.getMax(), &itChunk)) { + chunkBoundaries.push_back(itChunk.getMax()); chunksToMerge.push_back(itChunk); } if (chunksToMerge.empty()) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " range starting at " << redact(minKey) << " and ending at " - << redact(maxKey) << " does not belong to shard " << gss->getShardName(); + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " range starting at " + << redact(minKey) << " and ending at " << redact(maxKey) << " does not belong to shard " + << shardingState->getShardName(); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } // @@ -267,12 +195,12 @@ bool mergeChunks(OperationContext* txn, bool minKeyInRange = rangeContains(firstDocMin, firstDocMax, minKey); if (!minKeyInRange) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " range starting at " << redact(minKey) - << " does not belong to shard " << gss->getShardName(); + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " range starting at " + << redact(minKey) << " does not belong to shard " << shardingState->getShardName(); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } BSONObj lastDocMin = chunksToMerge.back().getMin(); @@ -281,89 +209,121 @@ bool mergeChunks(OperationContext* txn, bool maxKeyInRange = lastDocMin.woCompare(maxKey) < 0 && lastDocMax.woCompare(maxKey) >= 0; if (!maxKeyInRange) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " range ending at " << redact(maxKey) << " does not belong to shard " - << gss->getShardName(); + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " range ending at " + << redact(maxKey) << " does not belong to shard " << shardingState->getShardName(); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } bool validRangeStartKey = firstDocMin.woCompare(minKey) == 0; bool validRangeEndKey = lastDocMax.woCompare(maxKey) == 0; if (!validRangeStartKey || !validRangeEndKey) { - *errMsg = - stream() << "could not merge chunks, collection " << nss.ns() - << " does not contain a chunk " - << (!validRangeStartKey ? "starting at " + redact(minKey.toString()) : "") - << (!validRangeStartKey && !validRangeEndKey ? " or " : "") - << (!validRangeEndKey ? "ending at " + redact(maxKey.toString()) : ""); - - warning() << *errMsg; - return false; + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " does not contain a chunk " + << (!validRangeStartKey ? "starting at " + redact(minKey.toString()) : "") + << (!validRangeStartKey && !validRangeEndKey ? " or " : "") + << (!validRangeEndKey ? "ending at " + redact(maxKey.toString()) : ""); + + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } if (chunksToMerge.size() == 1) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " already contains chunk for " - << redact(rangeToString(minKey, maxKey)); + std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns() + << " already contains chunk for " + << redact(rangeToString(minKey, maxKey)); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } + // Look for hole in range for (size_t i = 1; i < chunksToMerge.size(); ++i) { if (chunksToMerge[i - 1].getMax().woCompare(chunksToMerge[i].getMin()) != 0) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " has a hole in the range " - << redact(rangeToString(minKey, maxKey)) << " at " - << redact(rangeToString(chunksToMerge[i - 1].getMax(), - chunksToMerge[i].getMin())); + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " has a hole in the range " + << redact(rangeToString(minKey, maxKey)) << " at " + << redact(rangeToString(chunksToMerge[i - 1].getMax(), chunksToMerge[i].getMin())); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } } - // - // Run apply ops command - // - ChunkVersion mergeVersion = metadata->getCollVersion(); - mergeVersion.incMinor(); - - // Ensure that the newly applied chunks would result in a correct metadata state - auto metadataAfterMerge = uassertStatusOK(metadata->cloneMerge(minKey, maxKey, mergeVersion)); + { + // Ensure that the newly applied chunks would result in a correct metadata state + ChunkVersion mergeVersion = metadata->getCollVersion(); + mergeVersion.incMinor(); - Status applyOpsStatus = runApplyOpsCmd(txn, chunksToMerge, shardVersion, mergeVersion); - if (!applyOpsStatus.isOK()) { - warning() << redact(applyOpsStatus); - return false; + uassertStatusOK(metadata->cloneMerge(minKey, maxKey, mergeVersion)); } // - // Install merged chunk metadata + // Run _configsvrMergeChunks. // + MergeChunkRequest request{ + nss, shardingState->getShardName(), shardVersion.epoch(), chunkBoundaries}; - { - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X); - - auto css = CollectionShardingState::get(txn, nss); - css->refreshMetadata(txn, std::move(metadataAfterMerge)); - } + auto configCmdObj = + request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + auto cmdResponseStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + configCmdObj, + Shard::RetryPolicy::kIdempotent); // - // Log change + // Refresh metadata to pick up new chunk definitions (regardless of the results returned from + // running _configsvrMergeChunk). // + { + ChunkVersion shardVersionAfterMerge; + refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterMerge); + + if (!refreshStatus.isOK()) { + std::string errmsg = str::stream() << "failed to refresh metadata for merge chunk [" + << redact(minKey) << "," << redact(maxKey) << ") " + << redact(refreshStatus); - BSONObj mergeLogEntry = buildMergeLogEntry(chunksToMerge, shardVersion, mergeVersion); + warning() << errmsg; + return Status(refreshStatus.code(), errmsg); + } + } - grid.catalogClient(txn)->logChange( - txn, "merge", nss.ns(), mergeLogEntry, ShardingCatalogClient::kMajorityWriteConcern); + // If we failed to get any response from the config server at all, despite retries, then we + // should just go ahead and fail the whole operation. + if (!cmdResponseStatus.isOK()) { + return cmdResponseStatus.getStatus(); + } + + // If _configsvrMergeChunk returned an error, look at this shard's metadata to determine if + // the merge actually did happen. This can happen if there's a network error getting the + // response from the first call to _configsvrMergeChunk, but it actually succeeds, thus the + // automatic retry fails with a precondition violation, for example. + auto commandStatus = std::move(cmdResponseStatus.getValue().commandStatus); + auto writeConcernStatus = std::move(cmdResponseStatus.getValue().writeConcernStatus); + + if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) && + _checkMetadataForSuccess(txn, nss, minKey, maxKey)) { + + LOG(1) << "mergeChunk [" << redact(minKey) << "," << redact(maxKey) + << ") has already been committed."; + } else if (!commandStatus.isOK()) { + std::string errmsg = str::stream() << "Failed to commit chunk merge" + << causedBy(redact(commandStatus)); + return Status(commandStatus.code(), errmsg); + } else if (!writeConcernStatus.isOK()) { + std::string errmsg = str::stream() << "Failed to commit chunk merge" + << causedBy(redact(writeConcernStatus)); + return Status(writeConcernStatus.code(), errmsg); + } - return true; + return Status::OK(); } class MergeChunksCommand : public Command { @@ -492,7 +452,8 @@ public: return false; } - return mergeChunks(txn, NamespaceString(ns), minKey, maxKey, epoch, &errmsg); + auto mergeStatus = mergeChunks(txn, NamespaceString(ns), minKey, maxKey, epoch); + return appendCommandStatus(result, mergeStatus); } } mergeChunksCmd; diff --git a/src/mongo/db/s/split_chunk_command.cpp b/src/mongo/db/s/split_chunk_command.cpp index ae37a92af65..c28ea1a5cec 100644 --- a/src/mongo/db/s/split_chunk_command.cpp +++ b/src/mongo/db/s/split_chunk_command.cpp @@ -100,10 +100,10 @@ bool checkIfSingleDoc(OperationContext* txn, // using the specified splitPoints. Returns false if the metadata's chunks don't match // the new chunk boundaries exactly. // -bool checkMetadataForSuccess(OperationContext* txn, - const NamespaceString& nss, - const ChunkRange& chunkRange, - const std::vector<BSONObj>& splitKeys) { +bool _checkMetadataForSuccess(OperationContext* txn, + const NamespaceString& nss, + const ChunkRange& chunkRange, + const std::vector<BSONObj>& splitKeys) { ScopedCollectionMetadata metadataAfterSplit; { AutoGetCollection autoColl(txn, nss, MODE_IS); @@ -118,13 +118,9 @@ bool checkMetadataForSuccess(OperationContext* txn, ChunkType nextChunk; for (const auto& endKey : newChunkBounds) { - log() << "checking metadataAfterSplit for new chunk boundaries [" << redact(startKey) << "," - << redact(endKey) << ")"; // Check that all new chunks fit the new chunk boundaries if (!metadataAfterSplit->getNextChunk(startKey, &nextChunk) || nextChunk.getMax().woCompare(endKey)) { - log() << "ERROR, found [" << redact(startKey) << "," << redact(nextChunk.getMax()) - << ")"; return false; } @@ -254,7 +250,7 @@ public: // // Lock the collection's metadata and get highest version for the current shard - // TODO(SERVER-25086): Remove distLock aquisition from split chunk + // TODO(SERVER-25086): Remove distLock acquisition from split chunk // const string whyMessage(str::stream() << "splitting chunk [" << min << ", " << max << ") in " @@ -350,10 +346,10 @@ public: log() << "splitChunk accepted at version " << shardVersion; auto request = SplitChunkRequest( - nss, expectedCollectionVersion.epoch(), chunkRange, splitKeys, shardName); + nss, shardName, expectedCollectionVersion.epoch(), chunkRange, splitKeys); - auto configCmdObj = request.toConfigCommandBSON( - BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + auto configCmdObj = + request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); auto cmdResponseStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( @@ -366,17 +362,23 @@ public: // // Refresh chunk metadata regardless of whether or not the split succeeded // - ChunkVersion shardVersionAfterSplit; - refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterSplit); + { + ChunkVersion shardVersionAfterSplit; + refreshStatus = + shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterSplit); - if (!refreshStatus.isOK()) { - errmsg = str::stream() << "failed to refresh metadata for split chunk [" << redact(min) - << "," << redact(max) << ") " << causedBy(redact(refreshStatus)); + if (!refreshStatus.isOK()) { + errmsg = str::stream() << "failed to refresh metadata for split chunk [" + << redact(min) << "," << redact(max) << ") " + << causedBy(redact(refreshStatus)); - warning() << errmsg; - return false; + warning() << errmsg; + return false; + } } + // If we failed to get any response from the config server at all, despite retries, then we + // should just go ahead and fail the whole operation. if (!cmdResponseStatus.isOK()) return appendCommandStatus(result, cmdResponseStatus.getStatus()); @@ -400,13 +402,13 @@ public: } // - // If _configsvrSplitChunk returned an error, look at this shard's metadata to deterine if + // If _configsvrSplitChunk returned an error, look at this shard's metadata to determine if // the split actually did happen. This can happen if there's a network error getting the // response from the first call to _configsvrSplitChunk, but it actually succeeds, thus the // automatic retry fails with a precondition violation, for example. // if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) && - checkMetadataForSuccess(txn, nss, chunkRange, splitKeys)) { + _checkMetadataForSuccess(txn, nss, chunkRange, splitKeys)) { LOG(1) << "splitChunk [" << redact(min) << "," << redact(max) << ") has already been committed."; diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index 48f0b6b99be..b27c588fa2d 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -102,7 +102,7 @@ const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode:: /** * Append min, max and version information from chunk to the buffer for logChange purposes. */ -void appendShortVersion(BufBuilder* b, const ChunkType& chunk) { +void _appendShortVersion(BufBuilder* b, const ChunkType& chunk) { BSONObjBuilder bb(*b); bb.append(ChunkType::min(), chunk.getMin()); bb.append(ChunkType::max(), chunk.getMax()); @@ -228,6 +228,69 @@ StatusWith<ChunkRange> includeFullShardKey(OperationContext* txn, shardKeyPattern.extendRangeBound(range.getMax(), false)); } +BSONArray _buildMergeChunksApplyOpsUpdates(const std::vector<ChunkType>& chunksToMerge, + const ChunkVersion& mergeVersion) { + BSONArrayBuilder updates; + + // Build an update operation to expand the first chunk into the newly merged chunk + { + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", false); // no upsert + op.append("ns", ChunkType::ConfigNS); + + // expand first chunk into newly merged chunk + ChunkType mergedChunk(chunksToMerge.front()); + mergedChunk.setMax(chunksToMerge.back().getMax()); + + // fill in additional details for sending through applyOps + mergedChunk.setVersion(mergeVersion); + + // add the new chunk information as the update object + op.append("o", mergedChunk.toBSON()); + + // query object + op.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); + + updates.append(op.obj()); + } + + // Build update operations to delete the rest of the chunks to be merged. Remember not + // to delete the first chunk we're expanding + for (size_t i = 1; i < chunksToMerge.size(); ++i) { + BSONObjBuilder op; + op.append("op", "d"); + op.append("ns", ChunkType::ConfigNS); + + op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName()))); + + updates.append(op.obj()); + } + + return updates.arr(); +} + +BSONArray _buildMergeChunksApplyOpsPrecond(const std::vector<ChunkType>& chunksToMerge, + const ChunkVersion& collVersion) { + BSONArrayBuilder preCond; + + for (auto chunk : chunksToMerge) { + BSONObjBuilder b; + b.append("ns", ChunkType::ConfigNS); + b.append( + "q", + BSON("query" << BSON(ChunkType::ns(chunk.getNS()) << ChunkType::min(chunk.getMin()) + << ChunkType::max(chunk.getMax())) + << "orderby" + << BSON(ChunkType::DEPRECATED_lastmod() << -1))); + b.append("res", + BSON(ChunkType::DEPRECATED_epoch(collVersion.epoch()) + << ChunkType::shard(chunk.getShard().toString()))); + preCond.append(b.obj()); + } + return preCond.arr(); +} + } // namespace ShardingCatalogManagerImpl::ShardingCatalogManagerImpl( @@ -1212,8 +1275,8 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, } if (newChunks.size() == 2) { - _appendShortVersion(logDetail.subobjStart("left"), newChunks[0]); - _appendShortVersion(logDetail.subobjStart("right"), newChunks[1]); + _appendShortVersion(&logDetail.subobjStart("left"), newChunks[0]); + _appendShortVersion(&logDetail.subobjStart("right"), newChunks[1]); grid.catalogClient(txn)->logChange( txn, "split", ns.ns(), logDetail.obj(), WriteConcernOptions()); @@ -1227,7 +1290,7 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, chunkDetail.appendElements(beforeDetailObj); chunkDetail.append("number", i + 1); chunkDetail.append("of", newChunksSize); - _appendShortVersion(chunkDetail.subobjStart("chunk"), newChunks[i]); + _appendShortVersion(&chunkDetail.subobjStart("chunk"), newChunks[i]); grid.catalogClient(txn)->logChange( txn, "multi-split", ns.ns(), chunkDetail.obj(), WriteConcernOptions()); @@ -1305,62 +1368,14 @@ Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn, ChunkVersion mergeVersion = collVersion; mergeVersion.incMinor(); - BSONArrayBuilder updates; - - // Build an update operation to expand the first chunk into the newly merged chunk - { - BSONObjBuilder op; - op.append("op", "u"); - op.appendBool("b", false); - op.append("ns", ChunkType::ConfigNS); - - // expand first chunk into newly merged chunk - ChunkType mergedChunk(chunksToMerge.front()); - mergedChunk.setMax(chunksToMerge.back().getMax()); - - // fill in additional details for sending through applyOps - mergedChunk.setVersion(mergeVersion); - - // add the new chunk information as the update object - op.append("o", mergedChunk.toBSON()); - - // query object - op.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); - - updates.append(op.obj()); - } - - // Build update operations to delete the rest of the chunks to be merged. Remember not - // to delete the first chunk we're expanding - for (size_t i = 1; i < chunksToMerge.size(); ++i) { - BSONObjBuilder op; - op.append("op", "d"); - op.append("ns", ChunkType::ConfigNS); - - op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName()))); - - updates.append(op.obj()); - } - - BSONArrayBuilder preCond; - { - BSONObjBuilder b; - b.append("ns", ChunkType::ConfigNS); - b.append("q", - BSON("query" << BSON(ChunkType::ns(ns.ns())) << "orderby" - << BSON(ChunkType::DEPRECATED_lastmod() << -1))); - { - BSONObjBuilder bb(b.subobjStart("res")); - collVersion.addToBSON(bb, ChunkType::DEPRECATED_lastmod()); - } - preCond.append(b.obj()); - } + auto updates = _buildMergeChunksApplyOpsUpdates(chunksToMerge, mergeVersion); + auto preCond = _buildMergeChunksApplyOpsPrecond(chunksToMerge, collVersion); // apply the batch of updates to remote and local metadata Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated(txn, - updates.arr(), - preCond.arr(), + updates, + preCond, ns.ns(), mergeVersion, WriteConcernOptions(), @@ -1386,15 +1401,6 @@ Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn, return applyOpsStatus; } -void ShardingCatalogManagerImpl::_appendShortVersion(BufBuilder& b, const ChunkType& chunk) { - BSONObjBuilder bb(b); - bb.append(ChunkType::min(), chunk.getMin()); - bb.append(ChunkType::max(), chunk.getMax()); - if (chunk.isVersionSet()) - chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod()); - bb.done(); -} - void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { _executorForAddShard->appendConnectionStats(stats); } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h index aa4c437151b..5294e31ace7 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h @@ -195,11 +195,6 @@ private: StatusWith<std::vector<ShardType>> _getAllShardingUnawareShards(OperationContext* txn); /** - * Append min, max and version information from chunk to the buffer for logChange purposes. - */ - void _appendShortVersion(BufBuilder& b, const ChunkType& chunk); - - /** * Callback function used when rescheduling an addShard task after the first attempt failed. * Checks if the callback has been canceled, and if not, proceeds to call * _scheduleAddShardTask. diff --git a/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp index 23e3803379d..f3ae19faa30 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp @@ -361,5 +361,63 @@ TEST_F(MergeChunkTest, NonMatchingEpochsOfChunkAndRequestErrors) { ASSERT_EQ(ErrorCodes::StaleEpoch, mergeStatus); } +TEST_F(MergeChunkTest, MergeAlreadyHappenedFailsPrecondition) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 0, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + // Construct chunk to be merged + auto chunk2(chunk); + + auto chunkMin = BSON("a" << 1); + auto chunkBound = BSON("a" << 5); + auto chunkMax = BSON("a" << 10); + // first chunk boundaries + chunk.setMin(chunkMin); + chunk.setMax(chunkBound); + // second chunk boundaries + chunk2.setMin(chunkBound); + chunk2.setMax(chunkMax); + + std::vector<BSONObj> chunkBoundaries{chunkMin, chunkBound, chunkMax}; + + ChunkType mergedChunk(chunk); + auto mergedVersion = chunk.getVersion(); + mergedVersion.incMinor(); + mergedChunk.setVersion(mergedVersion); + mergedChunk.setMax(chunkMax); + + setupChunks({mergedChunk}); + + ASSERT_EQ(ErrorCodes::BadValue, + catalogManager()->commitChunkMerge(operationContext(), + NamespaceString("TestDB.TestColl"), + origVersion.epoch(), + chunkBoundaries, + "shard0000")); + + // Verify that no change to config.chunks happened. + auto findResponse = uassertStatusOK( + getConfigShard()->exhaustiveFindOnConfig(operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON(ChunkType::ns() << "TestDB.TestColl"), + BSON(ChunkType::DEPRECATED_lastmod << -1), + boost::none)); + + const auto& chunksVector = findResponse.docs; + + // There should be exactly one chunk left in the collection + ASSERT_EQ(1u, chunksVector.size()); + + // MergedChunk should have range [chunkMin, chunkMax] + ChunkType foundChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.front())); + ASSERT_BSONOBJ_EQ(mergedChunk.toBSON(), foundChunk.toBSON()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/request_types/merge_chunk_request_type.cpp b/src/mongo/s/request_types/merge_chunk_request_type.cpp index 508bf9b247d..103c5087368 100644 --- a/src/mongo/s/request_types/merge_chunk_request_type.cpp +++ b/src/mongo/s/request_types/merge_chunk_request_type.cpp @@ -48,9 +48,9 @@ const char kShardName[] = "shard"; } // unnamed namespace MergeChunkRequest::MergeChunkRequest(NamespaceString nss, + string shardName, OID epoch, - vector<BSONObj> chunkBoundaries, - string shardName) + vector<BSONObj> chunkBoundaries) : _nss(std::move(nss)), _epoch(std::move(epoch)), _chunkBoundaries(std::move(chunkBoundaries)), @@ -94,7 +94,7 @@ StatusWith<MergeChunkRequest> MergeChunkRequest::parseFromConfigCommand(const BS } auto request = MergeChunkRequest( - NamespaceString(ns), std::move(epoch), std::move(chunkBoundaries), std::move(shardName)); + NamespaceString(ns), std::move(shardName), std::move(epoch), std::move(chunkBoundaries)); Status validationStatus = request._validate(); if (!validationStatus.isOK()) { return validationStatus; diff --git a/src/mongo/s/request_types/merge_chunk_request_type.h b/src/mongo/s/request_types/merge_chunk_request_type.h index 5aa72ab6356..31cc6e69b19 100644 --- a/src/mongo/s/request_types/merge_chunk_request_type.h +++ b/src/mongo/s/request_types/merge_chunk_request_type.h @@ -44,9 +44,9 @@ namespace mongo { class MergeChunkRequest { public: MergeChunkRequest(NamespaceString nss, + std::string shardName, OID epoch, - std::vector<BSONObj> chunkBoundaries, - std::string shardName); + std::vector<BSONObj> chunkBoundaries); /** * Parses the provided BSON content as the internal _configsvrMergeChunk command, and if diff --git a/src/mongo/s/request_types/split_chunk_request_type.cpp b/src/mongo/s/request_types/split_chunk_request_type.cpp index 78c2c478d0c..a3ca50c7f32 100644 --- a/src/mongo/s/request_types/split_chunk_request_type.cpp +++ b/src/mongo/s/request_types/split_chunk_request_type.cpp @@ -48,10 +48,10 @@ const char kShardName[] = "shard"; } // unnamed namespace SplitChunkRequest::SplitChunkRequest(NamespaceString nss, + string shardName, OID epoch, ChunkRange chunkRange, - vector<BSONObj> splitPoints, - string shardName) + vector<BSONObj> splitPoints) : _nss(std::move(nss)), _epoch(std::move(epoch)), _chunkRange(std::move(chunkRange)), @@ -102,10 +102,10 @@ StatusWith<SplitChunkRequest> SplitChunkRequest::parseFromConfigCommand(const BS } auto request = SplitChunkRequest(NamespaceString(ns), + std::move(shardName), std::move(epoch), std::move(chunkRangeStatus.getValue()), - std::move(splitPoints), - std::move(shardName)); + std::move(splitPoints)); Status validationStatus = request._validate(); if (!validationStatus.isOK()) { return validationStatus; @@ -160,7 +160,7 @@ const string& SplitChunkRequest::getShardName() const { Status SplitChunkRequest::_validate() { if (!getNamespace().isValid()) { return Status(ErrorCodes::InvalidNamespace, - str::stream() << "invalid namespaace '" << _nss.ns() + str::stream() << "invalid namespace '" << _nss.ns() << "' specified for request"); } diff --git a/src/mongo/s/request_types/split_chunk_request_type.h b/src/mongo/s/request_types/split_chunk_request_type.h index 35a4b767923..f8fd4972a9d 100644 --- a/src/mongo/s/request_types/split_chunk_request_type.h +++ b/src/mongo/s/request_types/split_chunk_request_type.h @@ -45,10 +45,10 @@ namespace mongo { class SplitChunkRequest { public: SplitChunkRequest(NamespaceString nss, + std::string shardName, OID epoch, ChunkRange chunkRange, - std::vector<BSONObj> splitPoints, - std::string shardName); + std::vector<BSONObj> splitPoints); /** * Parses the provided BSON content as the internal _configsvrSplitChunk command, and if |