diff options
author | Spencer T Brody <spencer@mongodb.com> | 2016-08-30 18:13:32 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2016-09-06 12:44:46 -0400 |
commit | f8872dc852e32ff1cdfcead7b530bc5c350edfcf (patch) | |
tree | 9e30188a1a43c04178360ffd979651692d2fe7dc /src/mongo/db/s/merge_chunks_command.cpp | |
parent | e0a2e534e4c917d6abe7f6d00c1eaf75fa9f2358 (diff) | |
download | mongo-f8872dc852e32ff1cdfcead7b530bc5c350edfcf.tar.gz |
SERVER-25002 Wire in new _configsvrMergeChunks command so shards no longer run applyOps directly
Diffstat (limited to 'src/mongo/db/s/merge_chunks_command.cpp')
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 343 |
1 files changed, 152 insertions, 191 deletions
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index 9619e40bdb1..bfc24d2dfc7 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -42,7 +42,9 @@ #include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/s/request_types/merge_chunk_request_type.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -55,152 +57,75 @@ using std::vector; namespace { -BSONArray buildOpPrecond(const string& ns, - const ShardId& shardName, - const ChunkVersion& shardVersion) { - BSONArrayBuilder preCond; - BSONObjBuilder condB; - condB.append("ns", ChunkType::ConfigNS); - condB.append("q", - BSON("query" << BSON(ChunkType::ns(ns)) << "orderby" - << BSON(ChunkType::DEPRECATED_lastmod() << -1))); +bool _checkMetadataForSuccess(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey, + const BSONObj& maxKey) { + ScopedCollectionMetadata metadataAfterMerge; { - BSONObjBuilder resB(condB.subobjStart("res")); - shardVersion.addToBSON(resB, ChunkType::DEPRECATED_lastmod()); - resB.done(); - } - preCond.append(condB.obj()); - return preCond.arr(); -} - -BSONObj buildOpMergeChunk(const ChunkType& mergedChunk) { - BSONObjBuilder opB; - - // Op basics - opB.append("op", "u"); - opB.appendBool("b", false); // no upserting - opB.append("ns", ChunkType::ConfigNS); - - // New object - opB.append("o", mergedChunk.toBSON()); - - // Query object - opB.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); - - return opB.obj(); -} - -BSONObj buildOpRemoveChunk(const ChunkType& chunkToRemove) { - BSONObjBuilder opB; - - // Op basics - opB.append("op", "d"); // delete - opB.append("ns", ChunkType::ConfigNS); - - opB.append("o", BSON(ChunkType::name(chunkToRemove.getName()))); - - return opB.obj(); -} - -BSONObj buildMergeLogEntry(const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& currShardVersion, - const ChunkVersion& newMergedVersion) { - BSONObjBuilder logDetailB; - - BSONArrayBuilder mergedB(logDetailB.subarrayStart("merged")); + AutoGetCollection autoColl(txn, nss, MODE_IS); - for (const ChunkType& chunkToMerge : chunksToMerge) { - mergedB.append(chunkToMerge.toBSON()); + // Get collection metadata + metadataAfterMerge = CollectionShardingState::get(txn, nss.ns())->getMetadata(); } - mergedB.done(); - - currShardVersion.addToBSON(logDetailB, "prevShardVersion"); - newMergedVersion.addToBSON(logDetailB, "mergedVersion"); - - return logDetailB.obj(); -} - -Status runApplyOpsCmd(OperationContext* txn, - const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& currShardVersion, - const ChunkVersion& newMergedVersion) { - BSONArrayBuilder updatesB; - - // The chunk we'll be "expanding" is the first chunk - const ChunkType& firstChunk = chunksToMerge.front(); - - // Fill in details not tracked by metadata - ChunkType mergedChunk(firstChunk); - mergedChunk.setMax(chunksToMerge.back().getMax()); - mergedChunk.setVersion(newMergedVersion); - - updatesB.append(buildOpMergeChunk(mergedChunk)); - - // Don't remove chunk we're expanding - for (size_t i = 1; i < chunksToMerge.size(); ++i) { - ChunkType chunkToMerge(chunksToMerge[i]); - updatesB.append(buildOpRemoveChunk(chunkToMerge)); + ChunkType chunk; + if (!metadataAfterMerge->getNextChunk(minKey, &chunk)) { + return false; } - BSONArray preCond = buildOpPrecond(firstChunk.getNS(), firstChunk.getShard(), currShardVersion); - - return grid.catalogClient(txn)->applyChunkOpsDeprecated( - txn, - updatesB.arr(), - preCond, - firstChunk.getNS(), - newMergedVersion, - ShardingCatalogClient::kMajorityWriteConcern, - repl::ReadConcernLevel::kMajorityReadConcern); + return chunk.getMin().woCompare(minKey) == 0 && chunk.getMax().woCompare(maxKey) == 0; } -bool mergeChunks(OperationContext* txn, - const NamespaceString& nss, - const BSONObj& minKey, - const BSONObj& maxKey, - const OID& epoch, - string* errMsg) { +Status mergeChunks(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& minKey, + const BSONObj& maxKey, + const OID& epoch) { // Get the distributed lock + // TODO(SERVER-25086): Remove distLock acquisition from merge chunk const string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey << " to " << maxKey; auto scopedDistLock = grid.catalogClient(txn)->distLock( txn, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); if (!scopedDistLock.isOK()) { - *errMsg = stream() << "could not acquire collection lock for " << nss.ns() - << " to merge chunks in [" << redact(minKey) << "," << redact(maxKey) - << ") " << causedBy(scopedDistLock.getStatus()); + std::string errmsg = stream() << "could not acquire collection lock for " << nss.ns() + << " to merge chunks in [" << redact(minKey) << ", " + << redact(maxKey) << ")" + << causedBy(scopedDistLock.getStatus()); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(scopedDistLock.getStatus().code(), errmsg); } - ShardingState* gss = ShardingState::get(txn); + ShardingState* shardingState = ShardingState::get(txn); // // We now have the collection lock, refresh metadata to latest version and sanity check // ChunkVersion shardVersion; - Status status = gss->refreshMetadataNow(txn, nss.ns(), &shardVersion); + Status refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersion); - if (!status.isOK()) { - *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for " - << nss.ns() << causedBy(redact(status)); + if (!refreshStatus.isOK()) { + std::string errmsg = str::stream() + << "could not merge chunks, failed to refresh metadata for " << nss.ns() + << causedBy(redact(refreshStatus)); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(refreshStatus.code(), errmsg); } if (epoch.isSet() && shardVersion.epoch() != epoch) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " has changed" - << " since merge was sent" - << "(sent epoch : " << epoch.toString() - << ", current epoch : " << shardVersion.epoch().toString() << ")"; - - warning() << *errMsg; - return false; + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " has changed" + << " since merge was sent" + << "(sent epoch : " << epoch.toString() + << ", current epoch : " << shardVersion.epoch().toString() << ")"; + + warning() << errmsg; + return Status(ErrorCodes::StaleEpoch, errmsg); } ScopedCollectionMetadata metadata; @@ -209,52 +134,55 @@ bool mergeChunks(OperationContext* txn, metadata = CollectionShardingState::get(txn, nss.ns())->getMetadata(); if (!metadata || metadata->getKeyPattern().isEmpty()) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " is not sharded"; + std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns() + << " is not sharded"; - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } } dassert(metadata->getShardVersion().equals(shardVersion)); if (!metadata->isValidKey(minKey) || !metadata->isValidKey(maxKey)) { - *errMsg = stream() << "could not merge chunks, the range " - << redact(rangeToString(minKey, maxKey)) << " is not valid" - << " for collection " << nss.ns() << " with key pattern " - << metadata->getKeyPattern().toString(); + std::string errmsg = stream() << "could not merge chunks, the range " + << redact(rangeToString(minKey, maxKey)) << " is not valid" + << " for collection " << nss.ns() << " with key pattern " + << metadata->getKeyPattern().toString(); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } // // Get merged chunk information // - std::vector<ChunkType> chunksToMerge; + std::vector<BSONObj> chunkBoundaries; + chunkBoundaries.push_back(minKey); ChunkType itChunk; itChunk.setMin(minKey); itChunk.setMax(minKey); itChunk.setNS(nss.ns()); - itChunk.setShard(gss->getShardName()); + itChunk.setShard(shardingState->getShardName()); while (itChunk.getMax().woCompare(maxKey) < 0 && metadata->getNextChunk(itChunk.getMax(), &itChunk)) { + chunkBoundaries.push_back(itChunk.getMax()); chunksToMerge.push_back(itChunk); } if (chunksToMerge.empty()) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " range starting at " << redact(minKey) << " and ending at " - << redact(maxKey) << " does not belong to shard " << gss->getShardName(); + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " range starting at " + << redact(minKey) << " and ending at " << redact(maxKey) << " does not belong to shard " + << shardingState->getShardName(); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } // @@ -267,12 +195,12 @@ bool mergeChunks(OperationContext* txn, bool minKeyInRange = rangeContains(firstDocMin, firstDocMax, minKey); if (!minKeyInRange) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " range starting at " << redact(minKey) - << " does not belong to shard " << gss->getShardName(); + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " range starting at " + << redact(minKey) << " does not belong to shard " << shardingState->getShardName(); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } BSONObj lastDocMin = chunksToMerge.back().getMin(); @@ -281,89 +209,121 @@ bool mergeChunks(OperationContext* txn, bool maxKeyInRange = lastDocMin.woCompare(maxKey) < 0 && lastDocMax.woCompare(maxKey) >= 0; if (!maxKeyInRange) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " range ending at " << redact(maxKey) << " does not belong to shard " - << gss->getShardName(); + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " range ending at " + << redact(maxKey) << " does not belong to shard " << shardingState->getShardName(); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } bool validRangeStartKey = firstDocMin.woCompare(minKey) == 0; bool validRangeEndKey = lastDocMax.woCompare(maxKey) == 0; if (!validRangeStartKey || !validRangeEndKey) { - *errMsg = - stream() << "could not merge chunks, collection " << nss.ns() - << " does not contain a chunk " - << (!validRangeStartKey ? "starting at " + redact(minKey.toString()) : "") - << (!validRangeStartKey && !validRangeEndKey ? " or " : "") - << (!validRangeEndKey ? "ending at " + redact(maxKey.toString()) : ""); - - warning() << *errMsg; - return false; + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " does not contain a chunk " + << (!validRangeStartKey ? "starting at " + redact(minKey.toString()) : "") + << (!validRangeStartKey && !validRangeEndKey ? " or " : "") + << (!validRangeEndKey ? "ending at " + redact(maxKey.toString()) : ""); + + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } if (chunksToMerge.size() == 1) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " already contains chunk for " - << redact(rangeToString(minKey, maxKey)); + std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns() + << " already contains chunk for " + << redact(rangeToString(minKey, maxKey)); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } + // Look for hole in range for (size_t i = 1; i < chunksToMerge.size(); ++i) { if (chunksToMerge[i - 1].getMax().woCompare(chunksToMerge[i].getMin()) != 0) { - *errMsg = stream() << "could not merge chunks, collection " << nss.ns() - << " has a hole in the range " - << redact(rangeToString(minKey, maxKey)) << " at " - << redact(rangeToString(chunksToMerge[i - 1].getMax(), - chunksToMerge[i].getMin())); + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " has a hole in the range " + << redact(rangeToString(minKey, maxKey)) << " at " + << redact(rangeToString(chunksToMerge[i - 1].getMax(), chunksToMerge[i].getMin())); - warning() << *errMsg; - return false; + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); } } - // - // Run apply ops command - // - ChunkVersion mergeVersion = metadata->getCollVersion(); - mergeVersion.incMinor(); - - // Ensure that the newly applied chunks would result in a correct metadata state - auto metadataAfterMerge = uassertStatusOK(metadata->cloneMerge(minKey, maxKey, mergeVersion)); + { + // Ensure that the newly applied chunks would result in a correct metadata state + ChunkVersion mergeVersion = metadata->getCollVersion(); + mergeVersion.incMinor(); - Status applyOpsStatus = runApplyOpsCmd(txn, chunksToMerge, shardVersion, mergeVersion); - if (!applyOpsStatus.isOK()) { - warning() << redact(applyOpsStatus); - return false; + uassertStatusOK(metadata->cloneMerge(minKey, maxKey, mergeVersion)); } // - // Install merged chunk metadata + // Run _configsvrMergeChunks. // + MergeChunkRequest request{ + nss, shardingState->getShardName(), shardVersion.epoch(), chunkBoundaries}; - { - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X); - - auto css = CollectionShardingState::get(txn, nss); - css->refreshMetadata(txn, std::move(metadataAfterMerge)); - } + auto configCmdObj = + request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + auto cmdResponseStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + configCmdObj, + Shard::RetryPolicy::kIdempotent); // - // Log change + // Refresh metadata to pick up new chunk definitions (regardless of the results returned from + // running _configsvrMergeChunk). // + { + ChunkVersion shardVersionAfterMerge; + refreshStatus = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersionAfterMerge); + + if (!refreshStatus.isOK()) { + std::string errmsg = str::stream() << "failed to refresh metadata for merge chunk [" + << redact(minKey) << "," << redact(maxKey) << ") " + << redact(refreshStatus); - BSONObj mergeLogEntry = buildMergeLogEntry(chunksToMerge, shardVersion, mergeVersion); + warning() << errmsg; + return Status(refreshStatus.code(), errmsg); + } + } - grid.catalogClient(txn)->logChange( - txn, "merge", nss.ns(), mergeLogEntry, ShardingCatalogClient::kMajorityWriteConcern); + // If we failed to get any response from the config server at all, despite retries, then we + // should just go ahead and fail the whole operation. + if (!cmdResponseStatus.isOK()) { + return cmdResponseStatus.getStatus(); + } + + // If _configsvrMergeChunk returned an error, look at this shard's metadata to determine if + // the merge actually did happen. This can happen if there's a network error getting the + // response from the first call to _configsvrMergeChunk, but it actually succeeds, thus the + // automatic retry fails with a precondition violation, for example. + auto commandStatus = std::move(cmdResponseStatus.getValue().commandStatus); + auto writeConcernStatus = std::move(cmdResponseStatus.getValue().writeConcernStatus); + + if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) && + _checkMetadataForSuccess(txn, nss, minKey, maxKey)) { + + LOG(1) << "mergeChunk [" << redact(minKey) << "," << redact(maxKey) + << ") has already been committed."; + } else if (!commandStatus.isOK()) { + std::string errmsg = str::stream() << "Failed to commit chunk merge" + << causedBy(redact(commandStatus)); + return Status(commandStatus.code(), errmsg); + } else if (!writeConcernStatus.isOK()) { + std::string errmsg = str::stream() << "Failed to commit chunk merge" + << causedBy(redact(writeConcernStatus)); + return Status(writeConcernStatus.code(), errmsg); + } - return true; + return Status::OK(); } class MergeChunksCommand : public Command { @@ -492,7 +452,8 @@ public: return false; } - return mergeChunks(txn, NamespaceString(ns), minKey, maxKey, epoch, &errmsg); + auto mergeStatus = mergeChunks(txn, NamespaceString(ns), minKey, maxKey, epoch); + return appendCommandStatus(result, mergeStatus); } } mergeChunksCmd; |