diff options
Diffstat (limited to 'src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp')
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 176 |
1 files changed, 131 insertions, 45 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 0b68b839712..2272599806e 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -33,20 +33,26 @@ #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" +#include <fmt/format.h> + #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" #include "mongo/client/read_preference.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/ops/write_ops_retryability.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/migration_source_manager.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_key_index_util.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_statistics.h" @@ -69,6 +75,8 @@ namespace mongo { namespace { +using namespace fmt::literals; + const char kRecvChunkStatus[] = "_recvChunkStatus"; const char kRecvChunkCommit[] = "_recvChunkCommit"; const char kRecvChunkAbort[] = "_recvChunkAbort"; @@ -107,9 +115,8 @@ BSONObj createRequestWithSessionId(StringData commandName, return builder.obj(); } -BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation, - repl::OpTypeEnum opType) { - switch (opType) { +BSONObj getDocumentKeyFromReplOperation(repl::ReplOperation replOperation) { + switch (replOperation.getOpType()) { case repl::OpTypeEnum::kInsert: case repl::OpTypeEnum::kDelete: return replOperation.getObject(); @@ -168,6 +175,30 @@ private: const repl::OpTime _opTime; }; +LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler( + LogicalSessionId lsid, + const std::vector<repl::OplogEntry>& stmts, + repl::OpTime prepareOrCommitOpTime) + : _lsid(std::move(lsid)), _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) { + _stmts.reserve(stmts.size()); + _ownedReplBSONObj.reserve(stmts.size()); + + for (const auto& op : stmts) { + auto ownedBSON = op.getDurableReplOperation().toBSON().getOwned(); + _ownedReplBSONObj.push_back(ownedBSON); + _stmts.push_back( + repl::ReplOperation::parse({"MigrationChunkClonerSource_toReplOperation"}, ownedBSON)); + } +} + +LogTransactionOperationsForShardingHandler::LogTransactionOperationsForShardingHandler( + LogicalSessionId lsid, + const std::vector<repl::ReplOperation>& stmts, + repl::OpTime prepareOrCommitOpTime) + : _lsid(std::move(lsid)), + _stmts(stmts), + _prepareOrCommitOpTime(std::move(prepareOrCommitOpTime)) {} + void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestamp>) { std::set<NamespaceString> namespacesTouchedByTransaction; @@ -225,7 +256,7 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam continue; } - auto preImageDocKey = getDocumentKeyFromReplOperation(stmt, opType); + auto preImageDocKey = getDocumentKeyFromReplOperation(stmt); auto idElement = preImageDocKey["_id"]; if (idElement.eoo()) { @@ -235,52 +266,35 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam continue; } - auto const& minKey = cloner->_args.getMin().get(); - auto const& maxKey = cloner->_args.getMax().get(); - auto const& shardKeyPattern = cloner->_shardKeyPattern; - - // Note: This assumes that prepared transactions will always have post document key - // set. There is a small window where create collection coordinator releases the critical - // section and before it writes down the chunks for non-empty collections. So in theory, - // it is possible to have a prepared transaction while collection is unsharded - // and becomes sharded midway. This doesn't happen in practice because the only way to - // have a prepared transactions without being sharded is by directly connecting to the - // shards and manually preparing the transaction. Another exception is when transaction - // is prepared on an older version that doesn't set the post image document key. - auto postImageDocKey = stmt.getPostImageDocumentKey(); - if (postImageDocKey.isEmpty()) { - LOGV2_WARNING( - 6836102, - "Migration encountered a transaction operation without a post image document key", - "preImageDocKey"_attr = preImageDocKey); - } else { - auto postShardKeyValues = - shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey); - fassert(6836100, !postShardKeyValues.isEmpty()); - - if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) { - // If the preImageDoc is not in range but the postImageDoc was, we know that the - // document has changed shard keys and no longer belongs in the chunk being cloned. - // We will model the deletion of the preImage document so that the destination chunk - // does not receive an outdated version of this document. - - auto preImageShardKeyValues = - shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey); - fassert(6836101, !preImageShardKeyValues.isEmpty()); - - if (opType == repl::OpTypeEnum::kUpdate && - isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) { - opType = repl::OpTypeEnum::kDelete; - idElement = postImageDocKey["_id"]; - } else { + if (opType == repl::OpTypeEnum::kUpdate) { + auto const& shardKeyPattern = cloner->_shardKeyPattern; + auto preImageShardKeyValues = + shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey); + + // If prepare was performed from another term, we will not have the post image doc key + // since it is not persisted in the oplog. + auto postImageDocKey = stmt.getPostImageDocumentKey(); + if (!postImageDocKey.isEmpty()) { + if (!cloner->_processUpdateForXferMod(preImageDocKey, postImageDocKey)) { + // We don't need to add this op to session migration if neither post or pre + // image doc falls within the chunk range. continue; } + } else { + // We can't perform reads here using the same recovery unit because the transaction + // is already committed. We instead defer performing the reads when xferMods command + // is called. Also allow this op to be added to session migration since we can't + // tell whether post image doc will fall within the chunk range. If it turns out + // both preImage and postImage doc don't fall into the chunk range, it is not wrong + // for this op to be added to session migration, but it will result in wasted work + // and unneccesary extra oplog storage on the destination. + cloner->_deferProcessingForXferMod(preImageDocKey); } + } else { + cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}); } addToSessionMigrationOptimeQueueIfNeeded(cloner, nss, _prepareOrCommitOpTime); - - cloner->_addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}); } } @@ -800,11 +814,78 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, return Status::OK(); } +bool MigrationChunkClonerSourceLegacy::_processUpdateForXferMod(const BSONObj& preImageDocKey, + const BSONObj& postImageDocKey) { + auto const& minKey = _args.getMin().value(); + auto const& maxKey = _args.getMax().value(); + + auto postShardKeyValues = _shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey); + fassert(6836100, !postShardKeyValues.isEmpty()); + + auto opType = repl::OpTypeEnum::kUpdate; + auto idElement = preImageDocKey["_id"]; + + if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) { + // If the preImageDoc is not in range but the postImageDoc was, we know that the + // document has changed shard keys and no longer belongs in the chunk being cloned. + // We will model the deletion of the preImage document so that the destination chunk + // does not receive an outdated version of this document. + + auto preImageShardKeyValues = + _shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey); + fassert(6836101, !preImageShardKeyValues.isEmpty()); + + if (!isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) { + return false; + } + + opType = repl::OpTypeEnum::kDelete; + idElement = postImageDocKey["_id"]; + } + + _addToTransferModsQueue(idElement.wrap(), getOpCharForCrudOpType(opType), {}); + + return true; +} + +void MigrationChunkClonerSourceLegacy::_deferProcessingForXferMod(const BSONObj& preImageDocKey) { + stdx::lock_guard<Latch> sl(_mutex); + _deferredReloadOrDeletePreImageDocKeys.push_back(preImageDocKey.getOwned()); + _deferredUntransferredOpsCounter++; +} + +void MigrationChunkClonerSourceLegacy::_processDeferredXferMods(OperationContext* opCtx, + Database* db) { + std::vector<BSONObj> deferredReloadOrDeletePreImageDocKeys; + + { + stdx::unique_lock lk(_mutex); + deferredReloadOrDeletePreImageDocKeys.swap(_deferredReloadOrDeletePreImageDocKeys); + } + + for (const auto& preImageDocKey : deferredReloadOrDeletePreImageDocKeys) { + auto idElement = preImageDocKey["_id"]; + BSONObj newerVersionDoc; + if (!Helpers::findById(opCtx, db, nss().ns(), BSON("_id" << idElement), newerVersionDoc)) { + // If the document can no longer be found, this means that another later op must have + // deleted it. That delete would have been captured by the xferMods so nothing else to + // do here. + continue; + } + + auto postImageDocKey = + CollectionMetadata::extractDocumentKey(&_shardKeyPattern, newerVersionDoc); + static_cast<void>(_processUpdateForXferMod(preImageDocKey, postImageDocKey)); + } +} + Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder) { dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IS)); + _processDeferredXferMods(opCtx, db); + std::list<BSONObj> deleteList; std::list<BSONObj> updateList; @@ -849,6 +930,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, _untransferredDeletesCounter = _deleted.size(); _reload.splice(_reload.cbegin(), updateList); _untransferredUpsertsCounter = _reload.size(); + _deferredUntransferredOpsCounter = _deferredReloadOrDeletePreImageDocKeys.size(); return Status::OK(); } @@ -863,6 +945,8 @@ void MigrationChunkClonerSourceLegacy::_cleanup() { _untransferredUpsertsCounter = 0; _deleted.clear(); _untransferredDeletesCounter = 0; + _deferredReloadOrDeletePreImageDocKeys.clear(); + _deferredUntransferredOpsCounter = 0; } StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::_callRecipient(OperationContext* opCtx, @@ -1111,7 +1195,8 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC stdx::lock_guard<Latch> sl(_mutex); int64_t untransferredModsSizeBytes = _untransferredDeletesCounter * _averageObjectIdSize + - _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs; + (_untransferredUpsertsCounter + _deferredUntransferredOpsCounter) * + _averageObjectSizeForCloneLocs; if (_forceJumbo && _jumboChunkCloneState) { LOGV2(21992, @@ -1177,6 +1262,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "moveChunk data transfer within threshold to allow write blocking", "_untransferredUpsertsCounter"_attr = _untransferredUpsertsCounter, "_untransferredDeletesCounter"_attr = _untransferredDeletesCounter, + "_deferredUntransferredOpsCounter"_attr = _deferredUntransferredOpsCounter, "_averageObjectSizeForCloneLocs"_attr = _averageObjectSizeForCloneLocs, "_averageObjectIdSize"_attr = _averageObjectIdSize, "untransferredModsSizeBytes"_attr = untransferredModsSizeBytes, |